ready logs

This commit is contained in:
dal 2025-03-18 14:20:34 -06:00
parent 96731d86d0
commit 2c9e1e0508
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 190 additions and 73 deletions

View File

@ -15,9 +15,9 @@ use crate::models::AgentThread;
// Global BraintrustClient instance
static BRAINTRUST_CLIENT: Lazy<Option<Arc<BraintrustClient>>> = Lazy::new(|| {
match std::env::var("BRAINTRUST_API_KEY") {
Ok(_) => {
match BraintrustClient::new(None, "buster-agent-logs") {
match (std::env::var("BRAINTRUST_API_KEY"), std::env::var("BRAINTRUST_LOGGING_ID")) {
(Ok(_), Ok(buster_logging_id)) => {
match BraintrustClient::new(None, &buster_logging_id) {
Ok(client) => Some(client),
Err(e) => {
eprintln!("Failed to create Braintrust client: {}", e);
@ -25,7 +25,7 @@ static BRAINTRUST_CLIENT: Lazy<Option<Arc<BraintrustClient>>> = Lazy::new(|| {
}
}
}
Err(_) => None,
_ => None,
}
});
@ -109,7 +109,8 @@ impl MessageBuffer {
// Update state
self.first_message_sent = true;
self.last_flush = Instant::now();
self.content.clear(); // Clear content but keep tool calls as they may still be accumulating
// Do NOT clear content between flushes - we need to accumulate all content
// only to keep tool calls as they may still be accumulating
Ok(())
}
@ -410,24 +411,36 @@ impl Agent {
// Initialize trace and parent span if not provided (first call)
let (trace_builder, parent_span) = if trace_builder.is_none() && parent_span.is_none() {
if let Some(client) = &*BRAINTRUST_CLIENT {
// Create a new trace for this conversation
let trace = TraceBuilder::new(client.clone(), &format!("Agent Thread {}", thread.id));
// Create the parent span for the entire conversation
let span = trace.add_span("User Conversation", "conversation").await?;
// Get the most recent user message for logging
let user_message = thread.messages.iter()
// Find the most recent user message to use as our input content
let user_input_message = thread.messages.iter()
.filter(|msg| matches!(msg, AgentMessage::User { .. }))
.last()
.cloned();
let span = if let Some(user_msg) = user_message {
// Log the user message as input to the parent span
span.with_input(serde_json::to_value(&user_msg)?)
} else {
span
};
// Extract the content from the user message
let user_prompt_text = user_input_message
.as_ref()
.and_then(|msg| {
if let AgentMessage::User { content, .. } = msg {
Some(content.clone())
} else {
None
}
})
.unwrap_or_else(|| "No prompt available".to_string());
// Create a trace name with the thread ID
let trace_name = format!("Buster Super Agent {}", thread.id);
// Create the trace with just the user prompt as input
let trace = TraceBuilder::new(client.clone(), &trace_name);
// Add the user prompt text (not the full message) as input to the root span
// Ensure we're passing ONLY the content text, not the full message object
let root_span = trace.root_span().clone().with_input(serde_json::json!(user_prompt_text));
// Add chat_id (session_id) as metadata to the root span
let span = root_span.with_metadata("chat_id", self.session_id.to_string());
// Log the initial span
client.log_span(span.clone()).await?;
@ -496,25 +509,9 @@ impl Agent {
},
};
// Create an assistant span to track the assistant's response
let assistant_span = if let (Some(trace), Some(parent)) = (&trace_builder, &parent_span) {
if let Some(client) = &*BRAINTRUST_CLIENT {
// Create a span for the assistant message
let span = trace.add_child_span("Assistant Response", "llm", parent).await?;
// Add the request as input
let span = span.with_input(serde_json::to_value(&request)?);
// Log the assistant span
client.log_span(span.clone()).await?;
Some(span)
} else {
None
}
} else {
None
};
// We store the parent span to use for creating individual tool spans
// This avoids creating a general assistant span that would never be completed
let parent_for_tool_spans = parent_span.clone();
// Process the streaming chunks
let mut buffer = MessageBuffer::new();
@ -567,16 +564,16 @@ impl Agent {
}
}
Err(e) => {
// Log error in span
if let Some(assistant_span) = &assistant_span {
// Log error in parent span
if let Some(parent) = &parent_for_tool_spans {
if let Some(client) = &*BRAINTRUST_CLIENT {
// Create error info
let error_info = serde_json::json!({
"error": format!("Error in stream: {:?}", e)
});
// Log error as output to span
let error_span = assistant_span.clone().with_output(error_info);
// Log error as output to parent span
let error_span = parent.clone().with_output(error_info);
let _ = client.log_span(error_span).await;
}
}
@ -615,13 +612,32 @@ impl Agent {
// Update thread with assistant message
self.update_current_thread(final_message.clone()).await?;
// Log the assistant message
if let Some(ref assistant_span) = assistant_span {
if let Some(client) = &*BRAINTRUST_CLIENT {
let span = assistant_span.clone().with_output(serde_json::to_value(&final_message)?);
let _ = client.log_span(span).await;
// For a message without tool calls, create and log a new complete message span
// Otherwise, tool spans will be created individually for each tool call
if final_tool_calls.is_none() && trace_builder.is_some() {
if let (Some(trace), Some(parent)) = (&trace_builder, &parent_span) {
if let Some(client) = &*BRAINTRUST_CLIENT {
// Ensure we have the complete message content
// Make sure we clone the final message to avoid mutating it
let complete_final_message = final_message.clone();
// Create a fresh span for the text-only response
let span = trace.add_child_span("Assistant Response", "llm", parent).await?;
// Add chat_id (session_id) as metadata to the span
let span = span.with_metadata("chat_id", self.session_id.to_string());
// Add the full request/response information
let span = span.with_input(serde_json::to_value(&request)?);
let span = span.with_output(serde_json::to_value(&complete_final_message)?);
// Log the completed span
let _ = client.log_span(span).await;
}
}
}
// For messages with tool calls, we won't log the output here
// Instead, we'll create tool spans with this assistant span as parent
// If this is an auto response without tool calls, it means we're done
if final_tool_calls.is_none() {
@ -651,31 +667,28 @@ impl Agent {
// Execute each requested tool
for tool_call in tool_calls {
if let Some(tool) = self.tools.read().await.get(&tool_call.function.name) {
// Create a tool span
let tool_span = if let (Some(trace), Some(assistant)) = (&trace_builder, &assistant_span) {
if let Some(client) = &*BRAINTRUST_CLIENT {
// Create a span for the tool execution
// Create a tool span that combines the assistant request with the tool execution
let tool_span = if let (Some(trace), Some(parent)) = (&trace_builder, &parent_for_tool_spans) {
if let Some(_client) = &*BRAINTRUST_CLIENT {
// Create a span for the assistant + tool execution
let span = trace.add_child_span(
&format!("Tool: {}", tool_call.function.name),
&format!("Assistant: {}", tool_call.function.name),
"tool",
assistant
parent
).await?;
// Parse the parameters - log only the tool call as input
let params: Value = serde_json::from_str(&tool_call.function.arguments)?;
let tool_input = serde_json::json!({
"function": {
"name": tool_call.function.name,
"arguments": params
},
"id": tool_call.id
});
// Add chat_id (session_id) as metadata to the span
let span = span.with_metadata("chat_id", self.session_id.to_string());
// Add the tool call as input
let span = span.with_input(tool_input);
// Parse the parameters (unused in this context since we're using final_message)
let _params: Value = serde_json::from_str(&tool_call.function.arguments)?;
// Log the tool span
client.log_span(span.clone()).await?;
// Use the assistant message as input to this span
// This connects the assistant's request to the tool execution
let span = span.with_input(serde_json::to_value(&final_message)?);
// We don't log the span yet - we'll log it after we have the tool result
// The tool result will be added as output to this span
Some(span)
} else {
@ -725,12 +738,16 @@ impl Agent {
MessageProgress::Complete,
);
// Log the tool result
// Log the combined assistant+tool span with the tool result as output
if let Some(tool_span) = &tool_span {
if let Some(client) = &*BRAINTRUST_CLIENT {
// Create a new span with the tool message as output
let result_span = tool_span.clone().with_output(serde_json::to_value(&tool_message)?);
let _ = client.log_span(result_span).await;
// Only log completed messages
if matches!(tool_message, AgentMessage::Tool { progress: MessageProgress::Complete, .. }) {
// Now that we have the tool result, add it as output and log the span
// This creates a span showing assistant message -> tool execution -> tool result
let result_span = tool_span.clone().with_output(serde_json::to_value(&tool_message)?);
let _ = client.log_span(result_span).await;
}
}
}
@ -800,9 +817,109 @@ impl Agent {
async fn finish_trace(&self, trace: &Option<TraceBuilder>) -> Result<()> {
if let Some(trace) = trace {
if let Some(client) = &*BRAINTRUST_CLIENT {
let root_span = trace.root_span();
let finished_root = root_span.clone().with_output(serde_json::json!("Trace completed"));
client.log_span(finished_root).await?;
// Get the current thread with all conversation history
let thread = self.get_current_thread().await;
if let Some(thread) = thread {
// Extract all messages and format them for better readability
let formatted_conversation: Vec<serde_json::Value> = thread.messages.iter()
.map(|msg| {
match msg {
AgentMessage::User { content, .. } => {
serde_json::json!({
"role": "user",
"content": content
})
},
AgentMessage::Assistant { content, tool_calls, .. } => {
if let Some(content) = content {
if tool_calls.is_some() {
serde_json::json!({
"role": "assistant",
"content": content,
"has_tool_calls": true
})
} else {
serde_json::json!({
"role": "assistant",
"content": content
})
}
} else if let Some(tool_calls) = tool_calls {
serde_json::json!({
"role": "assistant",
"tool_calls": tool_calls
})
} else {
serde_json::json!({
"role": "assistant",
"content": null
})
}
},
AgentMessage::Tool { content, name, .. } => {
serde_json::json!({
"role": "tool",
"name": name,
"content": content
})
},
AgentMessage::Developer { id, content, name } => {
serde_json::json!({
"role": "developer",
"id": id,
"content": content,
"name": name
})
},
_ => serde_json::json!({
"role": "system",
"content": "Unknown message type"
})
}
})
.collect();
// Get the most recent user message to ensure input is preserved
let user_input_message = thread.messages.iter()
.filter(|msg| matches!(msg, AgentMessage::User { .. }))
.last()
.cloned();
// Extract the content from the user message
let user_prompt_text = user_input_message
.as_ref()
.and_then(|msg| {
if let AgentMessage::User { content, .. } = msg {
Some(content.clone())
} else {
None
}
})
.unwrap_or_else(|| "No prompt available".to_string());
// Log the complete formatted conversation as the trace output
let root_span = trace.root_span();
// Always ensure the root span has the chat_id metadata and preserves the input
let finished_root = root_span.clone()
.with_input(serde_json::json!(user_prompt_text))
.with_metadata("chat_id", self.session_id.to_string())
.with_output(serde_json::Value::Array(formatted_conversation)); // Don't nest under "conversation" key
client.log_span(finished_root).await?;
} else {
// Fallback if no thread is available
let root_span = trace.root_span();
// Still ensure we preserve input and have chat_id metadata even in fallback case
// We need to ensure input is persisted even when no thread is available
let finished_root = root_span.clone()
.with_metadata("chat_id", self.session_id.to_string())
.with_output(serde_json::json!([{
"role": "system",
"content": "Trace completed - no conversation history available"
}]));
client.log_span(finished_root).await?;
}
}
}
Ok(())