From 7b8181e59609304acd0d83cae6b87365bd3029df Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 6 May 2025 14:09:15 -0600 Subject: [PATCH] hotfix: braintrust logging failing --- api/libs/agents/src/agent.rs | 273 +---------------------------------- 1 file changed, 3 insertions(+), 270 deletions(-) diff --git a/api/libs/agents/src/agent.rs b/api/libs/agents/src/agent.rs index 471fa1057..eb0235a46 100644 --- a/api/libs/agents/src/agent.rs +++ b/api/libs/agents/src/agent.rs @@ -1,6 +1,5 @@ use crate::tools::{IntoToolCallExecutor, ToolExecutor}; use anyhow::Result; -use braintrust::{BraintrustClient, TraceBuilder}; use litellm::{ AgentMessage, ChatCompletionChunk, ChatCompletionRequest, DeltaToolCall, FunctionCall, LiteLLMClient, MessageProgress, Metadata, Tool, ToolCall, ToolChoice, @@ -25,23 +24,6 @@ use crate::models::AgentThread; // Import Mode related types (adjust path if needed) use crate::agents::modes::ModeConfiguration; -// Global BraintrustClient instance -static BRAINTRUST_CLIENT: Lazy>> = Lazy::new(|| { - match ( - std::env::var("BRAINTRUST_API_KEY"), - std::env::var("BRAINTRUST_LOGGING_ID"), - ) { - (Ok(_), Ok(buster_logging_id)) => match BraintrustClient::new(None, &buster_logging_id) { - Ok(client) => Some(client), - Err(e) => { - eprintln!("Failed to create Braintrust client: {}", e); - None - } - }, - _ => None, - } -}); - // --- Reverted AgentError Struct --- #[derive(Debug, Clone)] pub struct AgentError(pub String); @@ -491,7 +473,7 @@ impl Agent { // Clone agent here for use within the select! arms after the initial future completes let agent_clone_for_post_process = agent_arc_clone.clone(); tokio::select! { - result = Agent::process_thread_with_depth(agent_arc_clone, thread_clone.clone(), &thread_clone, 0, None, None) => { + result = Agent::process_thread_with_depth(agent_arc_clone, thread_clone.clone(), &thread_clone, 0) => { if let Err(e) = result { // Log the error let err_msg = format!("Error processing thread: {:?}", e); @@ -562,8 +544,6 @@ impl Agent { thread: AgentThread, thread_ref: &AgentThread, recursion_depth: u32, - trace_builder: Option, - parent_span: Option, ) -> Result<()> { // Attempt to initialize Raindrop client (non-blocking) let raindrop_client = RaindropClient::new().ok(); @@ -574,58 +554,6 @@ impl Agent { *current = Some(thread_ref.clone()); } - // Initialize trace and parent span if not provided (first call) - let (trace_builder, parent_span) = if trace_builder.is_none() && parent_span.is_none() { - if let Some(client) = &*BRAINTRUST_CLIENT { - // Find the most recent user message to use as our input content - let user_input_message = thread_ref - .messages - .iter() - .filter(|msg| matches!(msg, AgentMessage::User { .. })) - .last() - .cloned(); - - // Extract the content from the user message - let user_prompt_text = user_input_message - .as_ref() - .and_then(|msg| { - if let AgentMessage::User { content, .. } = msg { - Some(content.clone()) - } else { - None - } - }) - .unwrap_or_else(|| "No prompt available".to_string()); - - // Create a trace name with the thread ID - let trace_name = format!("Buster Super Agent {}", thread_ref.id); - - // Create the trace with just the user prompt as input - let trace = TraceBuilder::new(client.clone(), &trace_name); - - // Add the user prompt text (not the full message) as input to the root span - // Ensure we're passing ONLY the content text, not the full message object - let root_span = trace - .root_span() - .clone() - .with_input(serde_json::json!(user_prompt_text)); - - // Add chat_id (session_id) as metadata to the root span - let span = root_span.with_metadata("chat_id", agent.session_id.to_string()); - - // Log the span non-blockingly (client handles the background processing) - if let Err(e) = client.log_span(span.clone()).await { - error!("Failed to log initial span: {}", e); - } - - (Some(trace), Some(span)) - } else { - (None, None) - } - } else { - (trace_builder, parent_span) - }; - // Limit recursion to a maximum of 15 times if recursion_depth >= 15 { let max_depth_msg = format!("Maximum recursion depth ({}) reached.", recursion_depth); @@ -810,17 +738,6 @@ impl Agent { permanent_error ); tracing::error!(agent_name = %agent.name, chat_id = %agent.session_id, user_id = %agent.user_id, "{}", error_message); - // Log etc. as before... - if let Some(parent_span) = parent_span.clone() { - if let Some(client) = &*BRAINTRUST_CLIENT { - let error_span = parent_span.with_output(serde_json::json!({ - "error": error_message - })); - if let Err(log_err) = client.log_span(error_span).await { - error!("Failed to log error span: {}", log_err); - } - } - } return Err(permanent_error); // Return the permanent error } Err(last_retriable_error) => { @@ -830,24 +747,10 @@ impl Agent { last_retriable_error ); tracing::error!(agent_name = %agent.name, chat_id = %agent.session_id, user_id = %agent.user_id, "{}", error_message); - // Log etc. as before... - if let Some(parent_span) = parent_span.clone() { - if let Some(client) = &*BRAINTRUST_CLIENT { - let error_span = parent_span.with_output(serde_json::json!({ - "error": error_message - })); - if let Err(log_err) = client.log_span(error_span).await { - error!("Failed to log error span: {}", log_err); - } - } - } return Err(last_retriable_error); // Return the last retriable error } }; - // We store the parent span to use for creating individual tool spans - let parent_for_tool_spans = parent_span.clone(); - // Process the streaming chunks let mut buffer = MessageBuffer::new(); let mut _is_complete = false; @@ -912,20 +815,6 @@ impl Agent { format!("Error receiving chunk from LLM stream: {:?}", e); tracing::error!(agent_name = %agent.name, chat_id = %agent.session_id, user_id = %agent.user_id, "{}", error_message); - // Log error in span - if let Some(parent_span) = parent_span.clone() { - if let Some(client) = &*BRAINTRUST_CLIENT { - let error_span = parent_span.with_output(serde_json::json!({ - "error": error_message // Use formatted string - })); - - // Log span non-blockingly (client handles the background processing) - if let Err(log_err) = client.log_span(error_span).await { - error!("Failed to log stream error span: {}", log_err); - } - } - } - // --- End Updated Error Handling --- // Send string error over broadcast channel before returning let agent_error = AgentError(error_message.clone()); // Create string error if let Ok(sender) = agent.get_stream_sender().await { @@ -1071,42 +960,6 @@ impl Agent { for tool_call in tool_calls { // Find the registered tool entry if let Some(registered_tool) = agent_tools.get(&tool_call.function.name) { - // Create a tool span that combines the assistant request with the tool execution - let tool_span = if let (Some(trace), Some(parent)) = - (&trace_builder, &parent_for_tool_spans) - { - if let Some(_client) = &*BRAINTRUST_CLIENT { - // Create a span for the assistant + tool execution - let span = trace - .add_child_span( - &format!("Assistant: {}", tool_call.function.name), - "tool", - parent, - ) - .await?; - - // Add chat_id (session_id) as metadata to the span - let span = span.with_metadata("chat_id", agent.session_id.to_string()); - - // Parse the parameters (unused in this context since we're using final_message) - let _params: Value = - serde_json::from_str(&tool_call.function.arguments)?; - - // Use the assistant message as input to this span - // This connects the assistant's request to the tool execution - let span = span.with_input(serde_json::to_value(&final_message)?); - - // We don't log the span yet - we'll log it after we have the tool result - // The tool result will be added as output to this span - - Some(span) - } else { - None - } - } else { - None - }; - // Parse the parameters let params: Value = match serde_json::from_str(&tool_call.function.arguments) { Ok(p) => p, @@ -1182,23 +1035,6 @@ impl Agent { // Log error differently for timeout vs execution error if needed error!(agent_name = %agent.name, chat_id = %agent.session_id, user_id = %agent.user_id, tool_name = %tool_call.function.name, "{}", error_message); - // Log error in Braintrust span - if let Some(tool_span) = &tool_span { - if let Some(client) = &*BRAINTRUST_CLIENT { - let error_info = serde_json::json!({ - "error": error_message // Generic failure message - }); - let error_span = tool_span.clone().with_output(error_info); - if let Err(log_err) = client.log_span(error_span).await { - error!( - "Failed to log tool execution failure span: {}", - log_err - ); - } - } - } - // --- End Braintrust Logging --- - // Create an error tool message to send back to the LLM AgentMessage::tool( None, @@ -1212,31 +1048,6 @@ impl Agent { } }; - // Log the combined assistant+tool span with the tool result/error as output - if let Some(tool_span) = &tool_span { - if let Some(client) = &*BRAINTRUST_CLIENT { - // Only log completed messages - if matches!( - tool_message, - AgentMessage::Tool { - progress: MessageProgress::Complete, - .. - } - ) { - // Now that we have the tool result, add it as output and log the span - // This creates a span showing assistant message -> tool execution -> tool result - let result_span = tool_span - .clone() - .with_output(serde_json::to_value(&tool_message)?); - - // Log span non-blockingly (client handles the background processing) - if let Err(log_err) = client.log_span(result_span).await { - error!("Failed to log tool result span: {}", log_err); - } - } - } - } - // Broadcast the tool message as soon as we receive it - use try_send to avoid blocking // Handle the Result from get_stream_sender if let Ok(sender) = agent.get_stream_sender().await { @@ -1312,17 +1123,6 @@ impl Agent { // If a tool signaled termination, finish trace, send Done and exit. if should_terminate { - // Finish the trace without consuming it - agent.finish_trace(&trace_builder).await?; - // Send Done message - // Handle the Result from get_stream_sender - if let Ok(sender) = agent.get_stream_sender().await { - if let Err(e) = sender.send(Ok(AgentMessage::Done)) { - tracing::debug!("Failed to send Done message after tool termination (receiver likely dropped): {}", e); - } - } else { - tracing::debug!("Stream sender not available when sending Done message after tool termination."); - } return Ok(()); // Exit the function, preventing recursion } @@ -1330,37 +1130,8 @@ impl Agent { updated_thread_for_recursion.messages.extend(results); } else { // Log the final assistant response span only if NO tools were called - if let (Some(trace), Some(parent)) = (&trace_builder, &parent_span) { - if let Some(client) = &*BRAINTRUST_CLIENT { - // Ensure we have the complete message content - let complete_final_message = final_message.clone(); - - // Create a fresh span for the text-only response - let span = trace - .add_child_span("Assistant Response", "llm", parent) - .await?; - let span = span.with_metadata("chat_id", agent.session_id.to_string()); - let span = span.with_input(serde_json::to_value(&request)?); // Log the request - let span = span.with_output(serde_json::to_value(&complete_final_message)?); // Log the response - - // Log span non-blockingly - if let Err(log_err) = client.log_span(span).await { - error!("Failed to log assistant response span: {}", log_err); - } - } - } - - // Also log the final output to the parent span if no tools were called - if let Some(parent_span) = &parent_span { - if let Some(client) = &*BRAINTRUST_CLIENT { - let final_span = parent_span - .clone() - .with_output(serde_json::to_value(&final_message)?); - if let Err(log_err) = client.log_span(final_span).await { - error!("Failed to log final output span: {}", log_err); - } - } - } + // Braintrust logging for final assistant response (no tools) REMOVED + // Braintrust logging for final output to parent span (no tools) REMOVED // --- End Logging for Text-Only Response --- } @@ -1373,8 +1144,6 @@ impl Agent { updated_thread_for_recursion.clone(), &updated_thread_for_recursion, recursion_depth + 1, - trace_builder, - parent_span, )) .await } @@ -1398,42 +1167,6 @@ impl Agent { self.tools.read().await } - /// Helper method to finish a trace without consuming the TraceBuilder - /// This method is fully non-blocking and never affects application performance - async fn finish_trace(&self, trace: &Option) -> Result<()> { - // If there's no trace to finish or no client to log with, return immediately - if trace.is_none() || BRAINTRUST_CLIENT.is_none() { - return Ok(()); - } - - // Only create a completion span if we have an actual trace - if let Some(trace_builder) = trace { - // Get the trace root span ID to properly link the completion - let root_span_id = trace_builder.root_span_id(); - - // Create and log a completion span non-blockingly - if let Some(client) = &*BRAINTRUST_CLIENT { - // Create a new span for completion linked to the trace - let completion_span = client - .create_span( - "Trace Completion", - "completion", - Some(root_span_id), // Link to the trace's root span - Some(root_span_id), // Set parent to also be the root span - ) - .with_metadata("chat_id", self.session_id.to_string()); - - // Log span non-blockingly (client handles the background processing) - if let Err(e) = client.log_span(completion_span).await { - error!("Failed to log completion span: {}", e); - } - } - } - - // Return immediately, without waiting for any logging operations - Ok(()) - } - // Add this new method alongside other channel-related methods pub async fn close(&self) { let mut tx = self.stream_tx.write().await;