From 42f8226b2ebaa522a1838f1101546f1280b72072 Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 19 Mar 2025 00:04:52 -0600 Subject: [PATCH] ok lets try removing the debug logs on litellm --- api/libs/litellm/Cargo.toml | 1 + api/libs/litellm/src/client.rs | 163 +++++++++++++++++++++------------ 2 files changed, 106 insertions(+), 58 deletions(-) diff --git a/api/libs/litellm/Cargo.toml b/api/libs/litellm/Cargo.toml index 61964891f..4da5a6614 100644 --- a/api/libs/litellm/Cargo.toml +++ b/api/libs/litellm/Cargo.toml @@ -15,6 +15,7 @@ futures = { workspace = true } futures-util = { workspace = true } reqwest = { workspace = true } dotenv = { workspace = true } +once_cell = { workspace = true } [dev-dependencies] mockito = { workspace = true } diff --git a/api/libs/litellm/src/client.rs b/api/libs/litellm/src/client.rs index a59187a64..ce07a7a9e 100644 --- a/api/libs/litellm/src/client.rs +++ b/api/libs/litellm/src/client.rs @@ -3,9 +3,17 @@ use futures_util::StreamExt; use reqwest::{header, Client}; use std::env; use tokio::sync::mpsc; +use once_cell::sync::Lazy; use super::types::*; +// Debug flag controlled by environment variable +static DEBUG_ENABLED: Lazy = Lazy::new(|| { + env::var("LITELLM_DEBUG") + .map(|val| val.to_lowercase() == "true" || val == "1") + .unwrap_or(false) +}); + #[derive(Clone)] pub struct LiteLLMClient { client: Client, @@ -14,6 +22,13 @@ pub struct LiteLLMClient { } impl LiteLLMClient { + // Helper function for conditional debug logging + fn debug_log(msg: &str) { + if *DEBUG_ENABLED { + println!("DEBUG: {}", msg); + } + } + pub fn new(api_key: Option, base_url: Option) -> Self { let api_key = api_key.or_else(|| env::var("LLM_API_KEY").ok()).expect( "LLM_API_KEY must be provided either through parameter or environment variable", @@ -55,11 +70,13 @@ impl LiteLLMClient { ) -> Result { let url = format!("{}/chat/completions", self.base_url); - println!("DEBUG: Sending chat completion request to URL: {}", url); - println!( - "DEBUG: Request payload: {}", - serde_json::to_string_pretty(&request).unwrap() - ); + Self::debug_log(&format!("Sending chat completion request to URL: {}", url)); + if *DEBUG_ENABLED { + Self::debug_log(&format!( + "Request payload: {}", + serde_json::to_string_pretty(&request).unwrap() + )); + } let response = self .client @@ -68,31 +85,35 @@ impl LiteLLMClient { .send() .await?; - // Print the raw response text + // Get the raw response text let response_text = response.text().await?; - println!("DEBUG: Raw response payload: {}", response_text); + if *DEBUG_ENABLED { + Self::debug_log(&format!("Raw response payload: {}", response_text)); + } // Parse the response text into the expected type let response: ChatCompletionResponse = serde_json::from_str(&response_text)?; - // Print tool calls if present - if let Some(AgentMessage::Assistant { - tool_calls: Some(tool_calls), - .. - }) = response.choices.first().map(|c| &c.message) - { - println!("DEBUG: Tool calls in response:"); - for tool_call in tool_calls { - println!("DEBUG: Tool Call ID: {}", tool_call.id); - println!("DEBUG: Tool Name: {}", tool_call.function.name); - println!("DEBUG: Tool Arguments: {}", tool_call.function.arguments); + // Log tool calls if present and debug is enabled + if *DEBUG_ENABLED { + if let Some(AgentMessage::Assistant { + tool_calls: Some(tool_calls), + .. + }) = response.choices.first().map(|c| &c.message) + { + Self::debug_log("Tool calls in response:"); + for tool_call in tool_calls { + Self::debug_log(&format!("Tool Call ID: {}", tool_call.id)); + Self::debug_log(&format!("Tool Name: {}", tool_call.function.name)); + Self::debug_log(&format!("Tool Arguments: {}", tool_call.function.arguments)); + } } - } - println!( - "DEBUG: Received chat completion response: {}", - serde_json::to_string_pretty(&response).unwrap() - ); + Self::debug_log(&format!( + "Received chat completion response: {}", + serde_json::to_string_pretty(&response).unwrap() + )); + } Ok(response) } @@ -103,14 +124,16 @@ impl LiteLLMClient { ) -> Result>> { let url = format!("{}/chat/completions", self.base_url); - println!( - "DEBUG: Starting stream chat completion request to URL: {}", + Self::debug_log(&format!( + "Starting stream chat completion request to URL: {}", url - ); - println!( - "DEBUG: Stream request payload: {}", - serde_json::to_string_pretty(&request).unwrap() - ); + )); + if *DEBUG_ENABLED { + Self::debug_log(&format!( + "Stream request payload: {}", + serde_json::to_string_pretty(&request).unwrap() + )); + } let mut stream = self .client @@ -124,16 +147,21 @@ impl LiteLLMClient { .bytes_stream(); let (tx, rx) = mpsc::channel(100); + let debug_enabled = *DEBUG_ENABLED; // Capture for the async block tokio::spawn(async move { let mut buffer = String::new(); - println!("DEBUG: Stream processing started"); + if debug_enabled { + Self::debug_log("Stream processing started"); + } while let Some(chunk_result) = stream.next().await { match chunk_result { Ok(chunk) => { let chunk_str = String::from_utf8_lossy(&chunk); - println!("DEBUG: Raw response payload: {}", chunk_str); + if debug_enabled { + Self::debug_log(&format!("Raw response payload: {}", chunk_str)); + } buffer.push_str(&chunk_str); while let Some(pos) = buffer.find("\n\n") { @@ -142,53 +170,72 @@ impl LiteLLMClient { if line.starts_with("data: ") { let data = &line["data: ".len()..]; - println!("DEBUG: Processing stream data: {}", data); + if debug_enabled { + Self::debug_log(&format!("Processing stream data: {}", data)); + } if data == "[DONE]" { - println!("DEBUG: Stream completed with [DONE] signal"); + if debug_enabled { + Self::debug_log("Stream completed with [DONE] signal"); + } break; } if let Ok(response) = serde_json::from_str::(data) { - // Print tool calls if present in the stream chunk - if let Some(tool_calls) = &response.choices[0].delta.tool_calls - { - println!("DEBUG: Tool calls in stream chunk:"); - for tool_call in tool_calls { - if let (Some(id), Some(function)) = - (tool_call.id.clone(), tool_call.function.clone()) - { - println!("DEBUG: Tool Call ID: {}", id); - if let Some(name) = function.name { - println!("DEBUG: Tool Name: {}", name); - } - if let Some(arguments) = function.arguments { - println!( - "DEBUG: Tool Arguments: {}", - arguments - ); + // Log tool calls if present and debug is enabled + if debug_enabled { + if let Some(tool_calls) = &response.choices[0].delta.tool_calls + { + Self::debug_log("Tool calls in stream chunk:"); + for tool_call in tool_calls { + if let (Some(id), Some(function)) = + (tool_call.id.clone(), tool_call.function.clone()) + { + Self::debug_log(&format!("Tool Call ID: {}", id)); + if let Some(name) = function.name { + Self::debug_log(&format!("Tool Name: {}", name)); + } + if let Some(arguments) = function.arguments { + Self::debug_log(&format!( + "Tool Arguments: {}", + arguments + )); + } } } } + Self::debug_log(&format!("Parsed stream chunk: {:?}", response)); + } + + // Use try_send instead of send to avoid blocking + if tx.try_send(Ok(response)).is_err() { + // If the channel is full, log it but continue processing + if debug_enabled { + Self::debug_log("Warning: Channel full, receiver not keeping up"); + } } - - println!("DEBUG: Parsed stream chunk: {:?}", response); - let _ = tx.send(Ok(response)).await; } } } } Err(e) => { - println!("DEBUG: Error in stream processing: {:?}", e); - let _ = tx.send(Err(anyhow::Error::from(e))).await; + if debug_enabled { + Self::debug_log(&format!("Error in stream processing: {:?}", e)); + } + // Use try_send to avoid blocking + let _ = tx.try_send(Err(anyhow::Error::from(e))); } } } - println!("DEBUG: Stream processing completed"); + if debug_enabled { + Self::debug_log("Stream processing completed"); + } }); - println!("DEBUG: Returning stream receiver"); + if debug_enabled { + Self::debug_log("Returning stream receiver"); + } Ok(rx) } }