Merge pull request #273 from buster-so/evals

hotfix: braintrust logging failing
This commit is contained in:
dal 2025-05-06 13:09:49 -07:00 committed by GitHub
commit f85716279a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 3 additions and 270 deletions

View File

@ -1,6 +1,5 @@
use crate::tools::{IntoToolCallExecutor, ToolExecutor};
use anyhow::Result;
use braintrust::{BraintrustClient, TraceBuilder};
use litellm::{
AgentMessage, ChatCompletionChunk, ChatCompletionRequest, DeltaToolCall, FunctionCall,
LiteLLMClient, MessageProgress, Metadata, Tool, ToolCall, ToolChoice,
@ -25,23 +24,6 @@ use crate::models::AgentThread;
// Import Mode related types (adjust path if needed)
use crate::agents::modes::ModeConfiguration;
// Global BraintrustClient instance
static BRAINTRUST_CLIENT: Lazy<Option<Arc<BraintrustClient>>> = Lazy::new(|| {
match (
std::env::var("BRAINTRUST_API_KEY"),
std::env::var("BRAINTRUST_LOGGING_ID"),
) {
(Ok(_), Ok(buster_logging_id)) => match BraintrustClient::new(None, &buster_logging_id) {
Ok(client) => Some(client),
Err(e) => {
eprintln!("Failed to create Braintrust client: {}", e);
None
}
},
_ => None,
}
});
// --- Reverted AgentError Struct ---
#[derive(Debug, Clone)]
pub struct AgentError(pub String);
@ -491,7 +473,7 @@ impl Agent {
// Clone agent here for use within the select! arms after the initial future completes
let agent_clone_for_post_process = agent_arc_clone.clone();
tokio::select! {
result = Agent::process_thread_with_depth(agent_arc_clone, thread_clone.clone(), &thread_clone, 0, None, None) => {
result = Agent::process_thread_with_depth(agent_arc_clone, thread_clone.clone(), &thread_clone, 0) => {
if let Err(e) = result {
// Log the error
let err_msg = format!("Error processing thread: {:?}", e);
@ -562,8 +544,6 @@ impl Agent {
thread: AgentThread,
thread_ref: &AgentThread,
recursion_depth: u32,
trace_builder: Option<TraceBuilder>,
parent_span: Option<braintrust::Span>,
) -> Result<()> {
// Attempt to initialize Raindrop client (non-blocking)
let raindrop_client = RaindropClient::new().ok();
@ -574,58 +554,6 @@ impl Agent {
*current = Some(thread_ref.clone());
}
// Initialize trace and parent span if not provided (first call)
let (trace_builder, parent_span) = if trace_builder.is_none() && parent_span.is_none() {
if let Some(client) = &*BRAINTRUST_CLIENT {
// Find the most recent user message to use as our input content
let user_input_message = thread_ref
.messages
.iter()
.filter(|msg| matches!(msg, AgentMessage::User { .. }))
.last()
.cloned();
// Extract the content from the user message
let user_prompt_text = user_input_message
.as_ref()
.and_then(|msg| {
if let AgentMessage::User { content, .. } = msg {
Some(content.clone())
} else {
None
}
})
.unwrap_or_else(|| "No prompt available".to_string());
// Create a trace name with the thread ID
let trace_name = format!("Buster Super Agent {}", thread_ref.id);
// Create the trace with just the user prompt as input
let trace = TraceBuilder::new(client.clone(), &trace_name);
// Add the user prompt text (not the full message) as input to the root span
// Ensure we're passing ONLY the content text, not the full message object
let root_span = trace
.root_span()
.clone()
.with_input(serde_json::json!(user_prompt_text));
// Add chat_id (session_id) as metadata to the root span
let span = root_span.with_metadata("chat_id", agent.session_id.to_string());
// Log the span non-blockingly (client handles the background processing)
if let Err(e) = client.log_span(span.clone()).await {
error!("Failed to log initial span: {}", e);
}
(Some(trace), Some(span))
} else {
(None, None)
}
} else {
(trace_builder, parent_span)
};
// Limit recursion to a maximum of 15 times
if recursion_depth >= 15 {
let max_depth_msg = format!("Maximum recursion depth ({}) reached.", recursion_depth);
@ -810,17 +738,6 @@ impl Agent {
permanent_error
);
tracing::error!(agent_name = %agent.name, chat_id = %agent.session_id, user_id = %agent.user_id, "{}", error_message);
// Log etc. as before...
if let Some(parent_span) = parent_span.clone() {
if let Some(client) = &*BRAINTRUST_CLIENT {
let error_span = parent_span.with_output(serde_json::json!({
"error": error_message
}));
if let Err(log_err) = client.log_span(error_span).await {
error!("Failed to log error span: {}", log_err);
}
}
}
return Err(permanent_error); // Return the permanent error
}
Err(last_retriable_error) => {
@ -830,24 +747,10 @@ impl Agent {
last_retriable_error
);
tracing::error!(agent_name = %agent.name, chat_id = %agent.session_id, user_id = %agent.user_id, "{}", error_message);
// Log etc. as before...
if let Some(parent_span) = parent_span.clone() {
if let Some(client) = &*BRAINTRUST_CLIENT {
let error_span = parent_span.with_output(serde_json::json!({
"error": error_message
}));
if let Err(log_err) = client.log_span(error_span).await {
error!("Failed to log error span: {}", log_err);
}
}
}
return Err(last_retriable_error); // Return the last retriable error
}
};
// We store the parent span to use for creating individual tool spans
let parent_for_tool_spans = parent_span.clone();
// Process the streaming chunks
let mut buffer = MessageBuffer::new();
let mut _is_complete = false;
@ -912,20 +815,6 @@ impl Agent {
format!("Error receiving chunk from LLM stream: {:?}", e);
tracing::error!(agent_name = %agent.name, chat_id = %agent.session_id, user_id = %agent.user_id, "{}", error_message);
// Log error in span
if let Some(parent_span) = parent_span.clone() {
if let Some(client) = &*BRAINTRUST_CLIENT {
let error_span = parent_span.with_output(serde_json::json!({
"error": error_message // Use formatted string
}));
// Log span non-blockingly (client handles the background processing)
if let Err(log_err) = client.log_span(error_span).await {
error!("Failed to log stream error span: {}", log_err);
}
}
}
// --- End Updated Error Handling ---
// Send string error over broadcast channel before returning
let agent_error = AgentError(error_message.clone()); // Create string error
if let Ok(sender) = agent.get_stream_sender().await {
@ -1071,42 +960,6 @@ impl Agent {
for tool_call in tool_calls {
// Find the registered tool entry
if let Some(registered_tool) = agent_tools.get(&tool_call.function.name) {
// Create a tool span that combines the assistant request with the tool execution
let tool_span = if let (Some(trace), Some(parent)) =
(&trace_builder, &parent_for_tool_spans)
{
if let Some(_client) = &*BRAINTRUST_CLIENT {
// Create a span for the assistant + tool execution
let span = trace
.add_child_span(
&format!("Assistant: {}", tool_call.function.name),
"tool",
parent,
)
.await?;
// Add chat_id (session_id) as metadata to the span
let span = span.with_metadata("chat_id", agent.session_id.to_string());
// Parse the parameters (unused in this context since we're using final_message)
let _params: Value =
serde_json::from_str(&tool_call.function.arguments)?;
// Use the assistant message as input to this span
// This connects the assistant's request to the tool execution
let span = span.with_input(serde_json::to_value(&final_message)?);
// We don't log the span yet - we'll log it after we have the tool result
// The tool result will be added as output to this span
Some(span)
} else {
None
}
} else {
None
};
// Parse the parameters
let params: Value = match serde_json::from_str(&tool_call.function.arguments) {
Ok(p) => p,
@ -1182,23 +1035,6 @@ impl Agent {
// Log error differently for timeout vs execution error if needed
error!(agent_name = %agent.name, chat_id = %agent.session_id, user_id = %agent.user_id, tool_name = %tool_call.function.name, "{}", error_message);
// Log error in Braintrust span
if let Some(tool_span) = &tool_span {
if let Some(client) = &*BRAINTRUST_CLIENT {
let error_info = serde_json::json!({
"error": error_message // Generic failure message
});
let error_span = tool_span.clone().with_output(error_info);
if let Err(log_err) = client.log_span(error_span).await {
error!(
"Failed to log tool execution failure span: {}",
log_err
);
}
}
}
// --- End Braintrust Logging ---
// Create an error tool message to send back to the LLM
AgentMessage::tool(
None,
@ -1212,31 +1048,6 @@ impl Agent {
}
};
// Log the combined assistant+tool span with the tool result/error as output
if let Some(tool_span) = &tool_span {
if let Some(client) = &*BRAINTRUST_CLIENT {
// Only log completed messages
if matches!(
tool_message,
AgentMessage::Tool {
progress: MessageProgress::Complete,
..
}
) {
// Now that we have the tool result, add it as output and log the span
// This creates a span showing assistant message -> tool execution -> tool result
let result_span = tool_span
.clone()
.with_output(serde_json::to_value(&tool_message)?);
// Log span non-blockingly (client handles the background processing)
if let Err(log_err) = client.log_span(result_span).await {
error!("Failed to log tool result span: {}", log_err);
}
}
}
}
// Broadcast the tool message as soon as we receive it - use try_send to avoid blocking
// Handle the Result from get_stream_sender
if let Ok(sender) = agent.get_stream_sender().await {
@ -1312,17 +1123,6 @@ impl Agent {
// If a tool signaled termination, finish trace, send Done and exit.
if should_terminate {
// Finish the trace without consuming it
agent.finish_trace(&trace_builder).await?;
// Send Done message
// Handle the Result from get_stream_sender
if let Ok(sender) = agent.get_stream_sender().await {
if let Err(e) = sender.send(Ok(AgentMessage::Done)) {
tracing::debug!("Failed to send Done message after tool termination (receiver likely dropped): {}", e);
}
} else {
tracing::debug!("Stream sender not available when sending Done message after tool termination.");
}
return Ok(()); // Exit the function, preventing recursion
}
@ -1330,37 +1130,8 @@ impl Agent {
updated_thread_for_recursion.messages.extend(results);
} else {
// Log the final assistant response span only if NO tools were called
if let (Some(trace), Some(parent)) = (&trace_builder, &parent_span) {
if let Some(client) = &*BRAINTRUST_CLIENT {
// Ensure we have the complete message content
let complete_final_message = final_message.clone();
// Create a fresh span for the text-only response
let span = trace
.add_child_span("Assistant Response", "llm", parent)
.await?;
let span = span.with_metadata("chat_id", agent.session_id.to_string());
let span = span.with_input(serde_json::to_value(&request)?); // Log the request
let span = span.with_output(serde_json::to_value(&complete_final_message)?); // Log the response
// Log span non-blockingly
if let Err(log_err) = client.log_span(span).await {
error!("Failed to log assistant response span: {}", log_err);
}
}
}
// Also log the final output to the parent span if no tools were called
if let Some(parent_span) = &parent_span {
if let Some(client) = &*BRAINTRUST_CLIENT {
let final_span = parent_span
.clone()
.with_output(serde_json::to_value(&final_message)?);
if let Err(log_err) = client.log_span(final_span).await {
error!("Failed to log final output span: {}", log_err);
}
}
}
// Braintrust logging for final assistant response (no tools) REMOVED
// Braintrust logging for final output to parent span (no tools) REMOVED
// --- End Logging for Text-Only Response ---
}
@ -1373,8 +1144,6 @@ impl Agent {
updated_thread_for_recursion.clone(),
&updated_thread_for_recursion,
recursion_depth + 1,
trace_builder,
parent_span,
))
.await
}
@ -1398,42 +1167,6 @@ impl Agent {
self.tools.read().await
}
/// Helper method to finish a trace without consuming the TraceBuilder
/// This method is fully non-blocking and never affects application performance
async fn finish_trace(&self, trace: &Option<TraceBuilder>) -> Result<()> {
// If there's no trace to finish or no client to log with, return immediately
if trace.is_none() || BRAINTRUST_CLIENT.is_none() {
return Ok(());
}
// Only create a completion span if we have an actual trace
if let Some(trace_builder) = trace {
// Get the trace root span ID to properly link the completion
let root_span_id = trace_builder.root_span_id();
// Create and log a completion span non-blockingly
if let Some(client) = &*BRAINTRUST_CLIENT {
// Create a new span for completion linked to the trace
let completion_span = client
.create_span(
"Trace Completion",
"completion",
Some(root_span_id), // Link to the trace's root span
Some(root_span_id), // Set parent to also be the root span
)
.with_metadata("chat_id", self.session_id.to_string());
// Log span non-blockingly (client handles the background processing)
if let Err(e) = client.log_span(completion_span).await {
error!("Failed to log completion span: {}", e);
}
}
}
// Return immediately, without waiting for any logging operations
Ok(())
}
// Add this new method alongside other channel-related methods
pub async fn close(&self) {
let mut tx = self.stream_tx.write().await;