diff --git a/api/libs/agents/src/agent.rs b/api/libs/agents/src/agent.rs index b70a24baf..7e3b9776d 100644 --- a/api/libs/agents/src/agent.rs +++ b/api/libs/agents/src/agent.rs @@ -15,9 +15,9 @@ use crate::models::AgentThread; // Global BraintrustClient instance static BRAINTRUST_CLIENT: Lazy>> = Lazy::new(|| { - match std::env::var("BRAINTRUST_API_KEY") { - Ok(_) => { - match BraintrustClient::new(None, "buster-agent-logs") { + match (std::env::var("BRAINTRUST_API_KEY"), std::env::var("BRAINTRUST_LOGGING_ID")) { + (Ok(_), Ok(buster_logging_id)) => { + match BraintrustClient::new(None, &buster_logging_id) { Ok(client) => Some(client), Err(e) => { eprintln!("Failed to create Braintrust client: {}", e); @@ -25,7 +25,7 @@ static BRAINTRUST_CLIENT: Lazy>> = Lazy::new(|| { } } } - Err(_) => None, + _ => None, } }); @@ -109,7 +109,8 @@ impl MessageBuffer { // Update state self.first_message_sent = true; self.last_flush = Instant::now(); - self.content.clear(); // Clear content but keep tool calls as they may still be accumulating + // Do NOT clear content between flushes - we need to accumulate all content + // only to keep tool calls as they may still be accumulating Ok(()) } @@ -410,24 +411,36 @@ impl Agent { // Initialize trace and parent span if not provided (first call) let (trace_builder, parent_span) = if trace_builder.is_none() && parent_span.is_none() { if let Some(client) = &*BRAINTRUST_CLIENT { - // Create a new trace for this conversation - let trace = TraceBuilder::new(client.clone(), &format!("Agent Thread {}", thread.id)); - - // Create the parent span for the entire conversation - let span = trace.add_span("User Conversation", "conversation").await?; - - // Get the most recent user message for logging - let user_message = thread.messages.iter() + // Find the most recent user message to use as our input content + let user_input_message = thread.messages.iter() .filter(|msg| matches!(msg, AgentMessage::User { .. })) .last() .cloned(); - let span = if let Some(user_msg) = user_message { - // Log the user message as input to the parent span - span.with_input(serde_json::to_value(&user_msg)?) - } else { - span - }; + // Extract the content from the user message + let user_prompt_text = user_input_message + .as_ref() + .and_then(|msg| { + if let AgentMessage::User { content, .. } = msg { + Some(content.clone()) + } else { + None + } + }) + .unwrap_or_else(|| "No prompt available".to_string()); + + // Create a trace name with the thread ID + let trace_name = format!("Buster Super Agent {}", thread.id); + + // Create the trace with just the user prompt as input + let trace = TraceBuilder::new(client.clone(), &trace_name); + + // Add the user prompt text (not the full message) as input to the root span + // Ensure we're passing ONLY the content text, not the full message object + let root_span = trace.root_span().clone().with_input(serde_json::json!(user_prompt_text)); + + // Add chat_id (session_id) as metadata to the root span + let span = root_span.with_metadata("chat_id", self.session_id.to_string()); // Log the initial span client.log_span(span.clone()).await?; @@ -496,25 +509,9 @@ impl Agent { }, }; - // Create an assistant span to track the assistant's response - let assistant_span = if let (Some(trace), Some(parent)) = (&trace_builder, &parent_span) { - if let Some(client) = &*BRAINTRUST_CLIENT { - // Create a span for the assistant message - let span = trace.add_child_span("Assistant Response", "llm", parent).await?; - - // Add the request as input - let span = span.with_input(serde_json::to_value(&request)?); - - // Log the assistant span - client.log_span(span.clone()).await?; - - Some(span) - } else { - None - } - } else { - None - }; + // We store the parent span to use for creating individual tool spans + // This avoids creating a general assistant span that would never be completed + let parent_for_tool_spans = parent_span.clone(); // Process the streaming chunks let mut buffer = MessageBuffer::new(); @@ -567,16 +564,16 @@ impl Agent { } } Err(e) => { - // Log error in span - if let Some(assistant_span) = &assistant_span { + // Log error in parent span + if let Some(parent) = &parent_for_tool_spans { if let Some(client) = &*BRAINTRUST_CLIENT { // Create error info let error_info = serde_json::json!({ "error": format!("Error in stream: {:?}", e) }); - // Log error as output to span - let error_span = assistant_span.clone().with_output(error_info); + // Log error as output to parent span + let error_span = parent.clone().with_output(error_info); let _ = client.log_span(error_span).await; } } @@ -615,13 +612,32 @@ impl Agent { // Update thread with assistant message self.update_current_thread(final_message.clone()).await?; - // Log the assistant message - if let Some(ref assistant_span) = assistant_span { - if let Some(client) = &*BRAINTRUST_CLIENT { - let span = assistant_span.clone().with_output(serde_json::to_value(&final_message)?); - let _ = client.log_span(span).await; + // For a message without tool calls, create and log a new complete message span + // Otherwise, tool spans will be created individually for each tool call + if final_tool_calls.is_none() && trace_builder.is_some() { + if let (Some(trace), Some(parent)) = (&trace_builder, &parent_span) { + if let Some(client) = &*BRAINTRUST_CLIENT { + // Ensure we have the complete message content + // Make sure we clone the final message to avoid mutating it + let complete_final_message = final_message.clone(); + + // Create a fresh span for the text-only response + let span = trace.add_child_span("Assistant Response", "llm", parent).await?; + + // Add chat_id (session_id) as metadata to the span + let span = span.with_metadata("chat_id", self.session_id.to_string()); + + // Add the full request/response information + let span = span.with_input(serde_json::to_value(&request)?); + let span = span.with_output(serde_json::to_value(&complete_final_message)?); + + // Log the completed span + let _ = client.log_span(span).await; + } } } + // For messages with tool calls, we won't log the output here + // Instead, we'll create tool spans with this assistant span as parent // If this is an auto response without tool calls, it means we're done if final_tool_calls.is_none() { @@ -651,31 +667,28 @@ impl Agent { // Execute each requested tool for tool_call in tool_calls { if let Some(tool) = self.tools.read().await.get(&tool_call.function.name) { - // Create a tool span - let tool_span = if let (Some(trace), Some(assistant)) = (&trace_builder, &assistant_span) { - if let Some(client) = &*BRAINTRUST_CLIENT { - // Create a span for the tool execution + // Create a tool span that combines the assistant request with the tool execution + let tool_span = if let (Some(trace), Some(parent)) = (&trace_builder, &parent_for_tool_spans) { + if let Some(_client) = &*BRAINTRUST_CLIENT { + // Create a span for the assistant + tool execution let span = trace.add_child_span( - &format!("Tool: {}", tool_call.function.name), + &format!("Assistant: {}", tool_call.function.name), "tool", - assistant + parent ).await?; - // Parse the parameters - log only the tool call as input - let params: Value = serde_json::from_str(&tool_call.function.arguments)?; - let tool_input = serde_json::json!({ - "function": { - "name": tool_call.function.name, - "arguments": params - }, - "id": tool_call.id - }); + // Add chat_id (session_id) as metadata to the span + let span = span.with_metadata("chat_id", self.session_id.to_string()); - // Add the tool call as input - let span = span.with_input(tool_input); + // Parse the parameters (unused in this context since we're using final_message) + let _params: Value = serde_json::from_str(&tool_call.function.arguments)?; - // Log the tool span - client.log_span(span.clone()).await?; + // Use the assistant message as input to this span + // This connects the assistant's request to the tool execution + let span = span.with_input(serde_json::to_value(&final_message)?); + + // We don't log the span yet - we'll log it after we have the tool result + // The tool result will be added as output to this span Some(span) } else { @@ -725,12 +738,16 @@ impl Agent { MessageProgress::Complete, ); - // Log the tool result + // Log the combined assistant+tool span with the tool result as output if let Some(tool_span) = &tool_span { if let Some(client) = &*BRAINTRUST_CLIENT { - // Create a new span with the tool message as output - let result_span = tool_span.clone().with_output(serde_json::to_value(&tool_message)?); - let _ = client.log_span(result_span).await; + // Only log completed messages + if matches!(tool_message, AgentMessage::Tool { progress: MessageProgress::Complete, .. }) { + // Now that we have the tool result, add it as output and log the span + // This creates a span showing assistant message -> tool execution -> tool result + let result_span = tool_span.clone().with_output(serde_json::to_value(&tool_message)?); + let _ = client.log_span(result_span).await; + } } } @@ -800,9 +817,109 @@ impl Agent { async fn finish_trace(&self, trace: &Option) -> Result<()> { if let Some(trace) = trace { if let Some(client) = &*BRAINTRUST_CLIENT { - let root_span = trace.root_span(); - let finished_root = root_span.clone().with_output(serde_json::json!("Trace completed")); - client.log_span(finished_root).await?; + // Get the current thread with all conversation history + let thread = self.get_current_thread().await; + + if let Some(thread) = thread { + // Extract all messages and format them for better readability + let formatted_conversation: Vec = thread.messages.iter() + .map(|msg| { + match msg { + AgentMessage::User { content, .. } => { + serde_json::json!({ + "role": "user", + "content": content + }) + }, + AgentMessage::Assistant { content, tool_calls, .. } => { + if let Some(content) = content { + if tool_calls.is_some() { + serde_json::json!({ + "role": "assistant", + "content": content, + "has_tool_calls": true + }) + } else { + serde_json::json!({ + "role": "assistant", + "content": content + }) + } + } else if let Some(tool_calls) = tool_calls { + serde_json::json!({ + "role": "assistant", + "tool_calls": tool_calls + }) + } else { + serde_json::json!({ + "role": "assistant", + "content": null + }) + } + }, + AgentMessage::Tool { content, name, .. } => { + serde_json::json!({ + "role": "tool", + "name": name, + "content": content + }) + }, + AgentMessage::Developer { id, content, name } => { + serde_json::json!({ + "role": "developer", + "id": id, + "content": content, + "name": name + }) + }, + _ => serde_json::json!({ + "role": "system", + "content": "Unknown message type" + }) + } + }) + .collect(); + + // Get the most recent user message to ensure input is preserved + let user_input_message = thread.messages.iter() + .filter(|msg| matches!(msg, AgentMessage::User { .. })) + .last() + .cloned(); + + // Extract the content from the user message + let user_prompt_text = user_input_message + .as_ref() + .and_then(|msg| { + if let AgentMessage::User { content, .. } = msg { + Some(content.clone()) + } else { + None + } + }) + .unwrap_or_else(|| "No prompt available".to_string()); + + // Log the complete formatted conversation as the trace output + let root_span = trace.root_span(); + + // Always ensure the root span has the chat_id metadata and preserves the input + let finished_root = root_span.clone() + .with_input(serde_json::json!(user_prompt_text)) + .with_metadata("chat_id", self.session_id.to_string()) + .with_output(serde_json::Value::Array(formatted_conversation)); // Don't nest under "conversation" key + client.log_span(finished_root).await?; + } else { + // Fallback if no thread is available + let root_span = trace.root_span(); + // Still ensure we preserve input and have chat_id metadata even in fallback case + // We need to ensure input is persisted even when no thread is available + let finished_root = root_span.clone() + .with_metadata("chat_id", self.session_id.to_string()) + .with_output(serde_json::json!([{ + "role": "system", + "content": "Trace completed - no conversation history available" + }])); + client.log_span(finished_root).await?; + } } } Ok(())