non blocking agent code now

This commit is contained in:
dal 2025-03-18 16:23:34 -06:00
parent 603c95e3eb
commit 98c159fbe7
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
3 changed files with 57 additions and 107 deletions

View File

@ -443,15 +443,10 @@ impl Agent {
// Add chat_id (session_id) as metadata to the root span // Add chat_id (session_id) as metadata to the root span
let span = root_span.with_metadata("chat_id", self.session_id.to_string()); let span = root_span.with_metadata("chat_id", self.session_id.to_string());
// Log the initial span without awaiting the result // Log the span non-blockingly (client handles the background processing)
// This ensures it doesn't block the main flow if let Err(e) = client.log_span(span.clone()).await {
let span_clone = span.clone(); error!("Failed to log initial span: {}", e);
let client_clone = client.clone(); }
tokio::spawn(async move {
if let Err(e) = client_clone.log_span(span_clone).await {
error!("Failed to log initial span: {}", e);
}
});
(Some(trace), Some(span)) (Some(trace), Some(span))
} else { } else {
@ -510,16 +505,10 @@ impl Agent {
"error": format!("Error starting stream: {:?}", e) "error": format!("Error starting stream: {:?}", e)
})); }));
// Create clones for the spawned task // Log span non-blockingly (client handles the background processing)
let error_span_clone = error_span.clone(); if let Err(log_err) = client.log_span(error_span).await {
let client_clone = client.clone(); error!("Failed to log error span: {}", log_err);
}
// Spawn a task for logging to ensure non-blocking behavior
tokio::spawn(async move {
if let Err(log_err) = client_clone.log_span(error_span_clone).await {
error!("Failed to log error span: {}", log_err);
}
});
} }
} }
let error_message = format!("Error starting stream: {:?}", e); let error_message = format!("Error starting stream: {:?}", e);
@ -593,16 +582,10 @@ impl Agent {
// Log error as output to parent span // Log error as output to parent span
let error_span = parent.clone().with_output(error_info); let error_span = parent.clone().with_output(error_info);
// Create clones for the spawned task // Log span non-blockingly (client handles the background processing)
let error_span_clone = error_span.clone(); if let Err(log_err) = client.log_span(error_span).await {
let client_clone = client.clone(); error!("Failed to log stream error span: {}", log_err);
}
// Spawn a task for logging to ensure non-blocking behavior
tokio::spawn(async move {
if let Err(log_err) = client_clone.log_span(error_span_clone).await {
error!("Failed to log stream error span: {}", log_err);
}
});
} }
} }
let error_message = format!("Error in stream: {:?}", e); let error_message = format!("Error in stream: {:?}", e);
@ -659,16 +642,10 @@ impl Agent {
let span = span.with_input(serde_json::to_value(&request)?); let span = span.with_input(serde_json::to_value(&request)?);
let span = span.with_output(serde_json::to_value(&complete_final_message)?); let span = span.with_output(serde_json::to_value(&complete_final_message)?);
// Clone needed values for the spawned task // Log span non-blockingly (client handles the background processing)
let span_clone = span.clone(); if let Err(log_err) = client.log_span(span).await {
let client_clone = client.clone(); error!("Failed to log assistant response span: {}", log_err);
}
// Spawn a task for logging to ensure non-blocking behavior
tokio::spawn(async move {
if let Err(log_err) = client_clone.log_span(span_clone).await {
error!("Failed to log assistant response span: {}", log_err);
}
});
} }
} }
} }
@ -683,16 +660,10 @@ impl Agent {
// Create a new span with the final message as output // Create a new span with the final message as output
let final_span = parent_span.clone().with_output(serde_json::to_value(&final_message)?); let final_span = parent_span.clone().with_output(serde_json::to_value(&final_message)?);
// Clone needed values for the spawned task // Log span non-blockingly (client handles the background processing)
let final_span_clone = final_span.clone(); if let Err(log_err) = client.log_span(final_span).await {
let client_clone = client.clone(); error!("Failed to log final output span: {}", log_err);
}
// Spawn a task for logging to ensure non-blocking behavior
tokio::spawn(async move {
if let Err(log_err) = client_clone.log_span(final_span_clone).await {
error!("Failed to log final output span: {}", log_err);
}
});
} }
} }
@ -767,16 +738,11 @@ impl Agent {
// Create a new span with the error output // Create a new span with the error output
let error_span = tool_span.clone().with_output(error_info); let error_span = tool_span.clone().with_output(error_info);
// Clone needed values for the spawned task
let error_span_clone = error_span.clone();
let client_clone = client.clone();
// Spawn a task for logging to ensure non-blocking behavior // Log span non-blockingly (client handles the background processing)
tokio::spawn(async move { if let Err(log_err) = client.log_span(error_span).await {
if let Err(log_err) = client_clone.log_span(error_span_clone).await { error!("Failed to log tool execution error span: {}", log_err);
error!("Failed to log tool execution error span: {}", log_err); }
}
});
} }
} }
let error_message = format!("Tool execution error: {:?}", e); let error_message = format!("Tool execution error: {:?}", e);
@ -801,16 +767,11 @@ impl Agent {
// Now that we have the tool result, add it as output and log the span // Now that we have the tool result, add it as output and log the span
// This creates a span showing assistant message -> tool execution -> tool result // This creates a span showing assistant message -> tool execution -> tool result
let result_span = tool_span.clone().with_output(serde_json::to_value(&tool_message)?); let result_span = tool_span.clone().with_output(serde_json::to_value(&tool_message)?);
// Clone needed values for the spawned task
let result_span_clone = result_span.clone();
let client_clone = client.clone();
// Spawn a task for logging to ensure non-blocking behavior // Log span non-blockingly (client handles the background processing)
tokio::spawn(async move { if let Err(log_err) = client.log_span(result_span).await {
if let Err(log_err) = client_clone.log_span(result_span_clone).await { error!("Failed to log tool result span: {}", log_err);
error!("Failed to log tool result span: {}", log_err); }
}
});
} }
} }
} }
@ -841,16 +802,10 @@ impl Agent {
// Create a new span with the final message as output // Create a new span with the final message as output
let final_span = parent_span.clone().with_output(serde_json::to_value(&final_message)?); let final_span = parent_span.clone().with_output(serde_json::to_value(&final_message)?);
// Clone needed values for the spawned task // Log span non-blockingly (client handles the background processing)
let final_span_clone = final_span.clone(); if let Err(log_err) = client.log_span(final_span).await {
let client_clone = client.clone(); error!("Failed to log final output span: {}", log_err);
}
// Spawn a task for logging to ensure non-blocking behavior
tokio::spawn(async move {
if let Err(log_err) = client_clone.log_span(final_span_clone).await {
error!("Failed to log final output span: {}", log_err);
}
});
} }
} }
@ -895,32 +850,23 @@ impl Agent {
return Ok(()); return Ok(());
} }
// Don't even try to access the trace - just return immediately // Get the session ID
// The real work is done asynchronously
// Get the session ID and thread data here, before spawning a new task
let session_id = self.session_id; let session_id = self.session_id;
let thread = self.get_current_thread().await;
// Instead of directly using trace, we'll do everything in a separate task // Create and log a completion span non-blockingly
if let Some(client) = &*BRAINTRUST_CLIENT { if let Some(client) = &*BRAINTRUST_CLIENT {
let client_clone = client.clone(); // Create a new empty span for completion
let completion_span = client.create_span(
"Trace Completion",
"completion",
None,
None
).with_metadata("chat_id", session_id.to_string());
// Fire and forget - spawn a task to handle the trace finishing // Log span non-blockingly (client handles the background processing)
// This won't block the main thread at all if let Err(e) = client.log_span(completion_span).await {
tokio::spawn(async move { error!("Failed to log completion span: {}", e);
// Create a new empty span for completion }
let completion_span = client_clone.create_span(
"Trace Completion",
"completion",
None,
None
).with_metadata("chat_id", session_id.to_string());
if let Err(e) = client_clone.log_span(completion_span).await {
error!("Failed to log completion span: {}", e);
}
});
} }
// Return immediately, without waiting for any logging operations // Return immediately, without waiting for any logging operations

View File

@ -120,13 +120,11 @@ impl BraintrustClient {
// Clone the sender to avoid awaiting on the send operation // Clone the sender to avoid awaiting on the send operation
let log_sender = self.log_sender.clone(); let log_sender = self.log_sender.clone();
// Fire and forget - spawn a task that won't block the caller // Fire and forget - handle internally without requiring caller to spawn
tokio::spawn(async move { if let Err(e) = log_sender.send(span).await {
if let Err(e) = log_sender.send(span).await { // Just log the error and continue, don't propagate it to the caller
// Just log the error and continue, don't propagate it to the caller error!("Failed to queue span for logging: {}", e);
error!("Failed to queue span for logging: {}", e); }
}
});
// Return immediately without awaiting the log operation // Return immediately without awaiting the log operation
Ok(()) Ok(())

View File

@ -32,6 +32,7 @@ impl TraceBuilder {
} }
/// Immediately create and log a span, return it for further updates /// Immediately create and log a span, return it for further updates
/// Logging happens asynchronously in the background
/// ///
/// # Arguments /// # Arguments
/// * `name` - Name of the span /// * `name` - Name of the span
@ -42,11 +43,13 @@ impl TraceBuilder {
pub async fn add_span(&self, name: &str, span_type: &str) -> Result<Span> { pub async fn add_span(&self, name: &str, span_type: &str) -> Result<Span> {
let span = Span::new(name, span_type, &self.root_span.span_id, Some(&self.root_span.span_id)); let span = Span::new(name, span_type, &self.root_span.span_id, Some(&self.root_span.span_id));
debug!("Adding span '{}' with ID: {} to trace", name, span.span_id()); debug!("Adding span '{}' with ID: {} to trace", name, span.span_id());
// Log span non-blockingly (client handles the background processing)
self.client.log_span(span.clone()).await?; self.client.log_span(span.clone()).await?;
Ok(span) Ok(span)
} }
/// Create a child span from a parent span /// Create a child span from a parent span
/// Logging happens asynchronously in the background
/// ///
/// # Arguments /// # Arguments
/// * `name` - Name of the span /// * `name` - Name of the span
@ -59,6 +62,7 @@ impl TraceBuilder {
let span = Span::new(name, span_type, &self.root_span.span_id, Some(parent_span.span_id())); let span = Span::new(name, span_type, &self.root_span.span_id, Some(parent_span.span_id()));
debug!("Adding child span '{}' with ID: {} to parent: {}", debug!("Adding child span '{}' with ID: {} to parent: {}",
name, span.span_id(), parent_span.span_id()); name, span.span_id(), parent_span.span_id());
// Log span non-blockingly (client handles the background processing)
self.client.log_span(span.clone()).await?; self.client.log_span(span.clone()).await?;
Ok(span) Ok(span)
} }
@ -74,12 +78,14 @@ impl TraceBuilder {
} }
/// Finish and log the root span /// Finish and log the root span
/// Logging happens asynchronously in the background
/// ///
/// # Returns /// # Returns
/// Result indicating success or failure /// Result indicating success or failure of queuing the span (always Ok)
pub async fn finish(self) -> Result<()> { pub async fn finish(self) -> Result<()> {
debug!("Finishing trace with root span ID: {}", self.root_span.span_id); debug!("Finishing trace with root span ID: {}", self.root_span.span_id);
let finished_root = self.root_span.set_output(serde_json::json!("Trace completed")); let finished_root = self.root_span.set_output(serde_json::json!("Trace completed"));
// Log span non-blockingly (client handles the background processing)
self.client.log_span(finished_root).await?; self.client.log_span(finished_root).await?;
Ok(()) Ok(())
} }