mirror of https://github.com/buster-so/buster.git
ok lets try removing the debug logs on litellm
This commit is contained in:
parent
32ca9ad422
commit
42f8226b2e
|
@ -15,6 +15,7 @@ futures = { workspace = true }
|
|||
futures-util = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
dotenv = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
mockito = { workspace = true }
|
||||
|
|
|
@ -3,9 +3,17 @@ use futures_util::StreamExt;
|
|||
use reqwest::{header, Client};
|
||||
use std::env;
|
||||
use tokio::sync::mpsc;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use super::types::*;
|
||||
|
||||
// Debug flag controlled by environment variable
|
||||
static DEBUG_ENABLED: Lazy<bool> = Lazy::new(|| {
|
||||
env::var("LITELLM_DEBUG")
|
||||
.map(|val| val.to_lowercase() == "true" || val == "1")
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LiteLLMClient {
|
||||
client: Client,
|
||||
|
@ -14,6 +22,13 @@ pub struct LiteLLMClient {
|
|||
}
|
||||
|
||||
impl LiteLLMClient {
|
||||
// Helper function for conditional debug logging
|
||||
fn debug_log(msg: &str) {
|
||||
if *DEBUG_ENABLED {
|
||||
println!("DEBUG: {}", msg);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(api_key: Option<String>, base_url: Option<String>) -> Self {
|
||||
let api_key = api_key.or_else(|| env::var("LLM_API_KEY").ok()).expect(
|
||||
"LLM_API_KEY must be provided either through parameter or environment variable",
|
||||
|
@ -55,11 +70,13 @@ impl LiteLLMClient {
|
|||
) -> Result<ChatCompletionResponse> {
|
||||
let url = format!("{}/chat/completions", self.base_url);
|
||||
|
||||
println!("DEBUG: Sending chat completion request to URL: {}", url);
|
||||
println!(
|
||||
"DEBUG: Request payload: {}",
|
||||
serde_json::to_string_pretty(&request).unwrap()
|
||||
);
|
||||
Self::debug_log(&format!("Sending chat completion request to URL: {}", url));
|
||||
if *DEBUG_ENABLED {
|
||||
Self::debug_log(&format!(
|
||||
"Request payload: {}",
|
||||
serde_json::to_string_pretty(&request).unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
let response = self
|
||||
.client
|
||||
|
@ -68,31 +85,35 @@ impl LiteLLMClient {
|
|||
.send()
|
||||
.await?;
|
||||
|
||||
// Print the raw response text
|
||||
// Get the raw response text
|
||||
let response_text = response.text().await?;
|
||||
println!("DEBUG: Raw response payload: {}", response_text);
|
||||
if *DEBUG_ENABLED {
|
||||
Self::debug_log(&format!("Raw response payload: {}", response_text));
|
||||
}
|
||||
|
||||
// Parse the response text into the expected type
|
||||
let response: ChatCompletionResponse = serde_json::from_str(&response_text)?;
|
||||
|
||||
// Print tool calls if present
|
||||
if let Some(AgentMessage::Assistant {
|
||||
tool_calls: Some(tool_calls),
|
||||
..
|
||||
}) = response.choices.first().map(|c| &c.message)
|
||||
{
|
||||
println!("DEBUG: Tool calls in response:");
|
||||
for tool_call in tool_calls {
|
||||
println!("DEBUG: Tool Call ID: {}", tool_call.id);
|
||||
println!("DEBUG: Tool Name: {}", tool_call.function.name);
|
||||
println!("DEBUG: Tool Arguments: {}", tool_call.function.arguments);
|
||||
// Log tool calls if present and debug is enabled
|
||||
if *DEBUG_ENABLED {
|
||||
if let Some(AgentMessage::Assistant {
|
||||
tool_calls: Some(tool_calls),
|
||||
..
|
||||
}) = response.choices.first().map(|c| &c.message)
|
||||
{
|
||||
Self::debug_log("Tool calls in response:");
|
||||
for tool_call in tool_calls {
|
||||
Self::debug_log(&format!("Tool Call ID: {}", tool_call.id));
|
||||
Self::debug_log(&format!("Tool Name: {}", tool_call.function.name));
|
||||
Self::debug_log(&format!("Tool Arguments: {}", tool_call.function.arguments));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!(
|
||||
"DEBUG: Received chat completion response: {}",
|
||||
serde_json::to_string_pretty(&response).unwrap()
|
||||
);
|
||||
Self::debug_log(&format!(
|
||||
"Received chat completion response: {}",
|
||||
serde_json::to_string_pretty(&response).unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
@ -103,14 +124,16 @@ impl LiteLLMClient {
|
|||
) -> Result<mpsc::Receiver<Result<ChatCompletionChunk>>> {
|
||||
let url = format!("{}/chat/completions", self.base_url);
|
||||
|
||||
println!(
|
||||
"DEBUG: Starting stream chat completion request to URL: {}",
|
||||
Self::debug_log(&format!(
|
||||
"Starting stream chat completion request to URL: {}",
|
||||
url
|
||||
);
|
||||
println!(
|
||||
"DEBUG: Stream request payload: {}",
|
||||
serde_json::to_string_pretty(&request).unwrap()
|
||||
);
|
||||
));
|
||||
if *DEBUG_ENABLED {
|
||||
Self::debug_log(&format!(
|
||||
"Stream request payload: {}",
|
||||
serde_json::to_string_pretty(&request).unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
let mut stream = self
|
||||
.client
|
||||
|
@ -124,16 +147,21 @@ impl LiteLLMClient {
|
|||
.bytes_stream();
|
||||
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
let debug_enabled = *DEBUG_ENABLED; // Capture for the async block
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut buffer = String::new();
|
||||
println!("DEBUG: Stream processing started");
|
||||
if debug_enabled {
|
||||
Self::debug_log("Stream processing started");
|
||||
}
|
||||
|
||||
while let Some(chunk_result) = stream.next().await {
|
||||
match chunk_result {
|
||||
Ok(chunk) => {
|
||||
let chunk_str = String::from_utf8_lossy(&chunk);
|
||||
println!("DEBUG: Raw response payload: {}", chunk_str);
|
||||
if debug_enabled {
|
||||
Self::debug_log(&format!("Raw response payload: {}", chunk_str));
|
||||
}
|
||||
buffer.push_str(&chunk_str);
|
||||
|
||||
while let Some(pos) = buffer.find("\n\n") {
|
||||
|
@ -142,53 +170,72 @@ impl LiteLLMClient {
|
|||
|
||||
if line.starts_with("data: ") {
|
||||
let data = &line["data: ".len()..];
|
||||
println!("DEBUG: Processing stream data: {}", data);
|
||||
if debug_enabled {
|
||||
Self::debug_log(&format!("Processing stream data: {}", data));
|
||||
}
|
||||
if data == "[DONE]" {
|
||||
println!("DEBUG: Stream completed with [DONE] signal");
|
||||
if debug_enabled {
|
||||
Self::debug_log("Stream completed with [DONE] signal");
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if let Ok(response) =
|
||||
serde_json::from_str::<ChatCompletionChunk>(data)
|
||||
{
|
||||
// Print tool calls if present in the stream chunk
|
||||
if let Some(tool_calls) = &response.choices[0].delta.tool_calls
|
||||
{
|
||||
println!("DEBUG: Tool calls in stream chunk:");
|
||||
for tool_call in tool_calls {
|
||||
if let (Some(id), Some(function)) =
|
||||
(tool_call.id.clone(), tool_call.function.clone())
|
||||
{
|
||||
println!("DEBUG: Tool Call ID: {}", id);
|
||||
if let Some(name) = function.name {
|
||||
println!("DEBUG: Tool Name: {}", name);
|
||||
}
|
||||
if let Some(arguments) = function.arguments {
|
||||
println!(
|
||||
"DEBUG: Tool Arguments: {}",
|
||||
arguments
|
||||
);
|
||||
// Log tool calls if present and debug is enabled
|
||||
if debug_enabled {
|
||||
if let Some(tool_calls) = &response.choices[0].delta.tool_calls
|
||||
{
|
||||
Self::debug_log("Tool calls in stream chunk:");
|
||||
for tool_call in tool_calls {
|
||||
if let (Some(id), Some(function)) =
|
||||
(tool_call.id.clone(), tool_call.function.clone())
|
||||
{
|
||||
Self::debug_log(&format!("Tool Call ID: {}", id));
|
||||
if let Some(name) = function.name {
|
||||
Self::debug_log(&format!("Tool Name: {}", name));
|
||||
}
|
||||
if let Some(arguments) = function.arguments {
|
||||
Self::debug_log(&format!(
|
||||
"Tool Arguments: {}",
|
||||
arguments
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Self::debug_log(&format!("Parsed stream chunk: {:?}", response));
|
||||
}
|
||||
|
||||
// Use try_send instead of send to avoid blocking
|
||||
if tx.try_send(Ok(response)).is_err() {
|
||||
// If the channel is full, log it but continue processing
|
||||
if debug_enabled {
|
||||
Self::debug_log("Warning: Channel full, receiver not keeping up");
|
||||
}
|
||||
}
|
||||
|
||||
println!("DEBUG: Parsed stream chunk: {:?}", response);
|
||||
let _ = tx.send(Ok(response)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("DEBUG: Error in stream processing: {:?}", e);
|
||||
let _ = tx.send(Err(anyhow::Error::from(e))).await;
|
||||
if debug_enabled {
|
||||
Self::debug_log(&format!("Error in stream processing: {:?}", e));
|
||||
}
|
||||
// Use try_send to avoid blocking
|
||||
let _ = tx.try_send(Err(anyhow::Error::from(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
println!("DEBUG: Stream processing completed");
|
||||
if debug_enabled {
|
||||
Self::debug_log("Stream processing completed");
|
||||
}
|
||||
});
|
||||
|
||||
println!("DEBUG: Returning stream receiver");
|
||||
if debug_enabled {
|
||||
Self::debug_log("Returning stream receiver");
|
||||
}
|
||||
Ok(rx)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue