From 98c159fbe7ea5a53291d43249d8dafb9c893bc6b Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 18 Mar 2025 16:23:34 -0600 Subject: [PATCH] non blocking agent code now --- api/libs/agents/src/agent.rs | 144 ++++++++++-------------------- api/libs/braintrust/src/client.rs | 12 ++- api/libs/braintrust/src/trace.rs | 8 +- 3 files changed, 57 insertions(+), 107 deletions(-) diff --git a/api/libs/agents/src/agent.rs b/api/libs/agents/src/agent.rs index 58daf2be4..14be898f1 100644 --- a/api/libs/agents/src/agent.rs +++ b/api/libs/agents/src/agent.rs @@ -443,15 +443,10 @@ impl Agent { // Add chat_id (session_id) as metadata to the root span let span = root_span.with_metadata("chat_id", self.session_id.to_string()); - // Log the initial span without awaiting the result - // This ensures it doesn't block the main flow - let span_clone = span.clone(); - let client_clone = client.clone(); - tokio::spawn(async move { - if let Err(e) = client_clone.log_span(span_clone).await { - error!("Failed to log initial span: {}", e); - } - }); + // Log the span non-blockingly (client handles the background processing) + if let Err(e) = client.log_span(span.clone()).await { + error!("Failed to log initial span: {}", e); + } (Some(trace), Some(span)) } else { @@ -510,16 +505,10 @@ impl Agent { "error": format!("Error starting stream: {:?}", e) })); - // Create clones for the spawned task - let error_span_clone = error_span.clone(); - let client_clone = client.clone(); - - // Spawn a task for logging to ensure non-blocking behavior - tokio::spawn(async move { - if let Err(log_err) = client_clone.log_span(error_span_clone).await { - error!("Failed to log error span: {}", log_err); - } - }); + // Log span non-blockingly (client handles the background processing) + if let Err(log_err) = client.log_span(error_span).await { + error!("Failed to log error span: {}", log_err); + } } } let error_message = format!("Error starting stream: {:?}", e); @@ -593,16 +582,10 @@ impl Agent { // Log error as output to parent span let error_span = parent.clone().with_output(error_info); - // Create clones for the spawned task - let error_span_clone = error_span.clone(); - let client_clone = client.clone(); - - // Spawn a task for logging to ensure non-blocking behavior - tokio::spawn(async move { - if let Err(log_err) = client_clone.log_span(error_span_clone).await { - error!("Failed to log stream error span: {}", log_err); - } - }); + // Log span non-blockingly (client handles the background processing) + if let Err(log_err) = client.log_span(error_span).await { + error!("Failed to log stream error span: {}", log_err); + } } } let error_message = format!("Error in stream: {:?}", e); @@ -659,16 +642,10 @@ impl Agent { let span = span.with_input(serde_json::to_value(&request)?); let span = span.with_output(serde_json::to_value(&complete_final_message)?); - // Clone needed values for the spawned task - let span_clone = span.clone(); - let client_clone = client.clone(); - - // Spawn a task for logging to ensure non-blocking behavior - tokio::spawn(async move { - if let Err(log_err) = client_clone.log_span(span_clone).await { - error!("Failed to log assistant response span: {}", log_err); - } - }); + // Log span non-blockingly (client handles the background processing) + if let Err(log_err) = client.log_span(span).await { + error!("Failed to log assistant response span: {}", log_err); + } } } } @@ -683,16 +660,10 @@ impl Agent { // Create a new span with the final message as output let final_span = parent_span.clone().with_output(serde_json::to_value(&final_message)?); - // Clone needed values for the spawned task - let final_span_clone = final_span.clone(); - let client_clone = client.clone(); - - // Spawn a task for logging to ensure non-blocking behavior - tokio::spawn(async move { - if let Err(log_err) = client_clone.log_span(final_span_clone).await { - error!("Failed to log final output span: {}", log_err); - } - }); + // Log span non-blockingly (client handles the background processing) + if let Err(log_err) = client.log_span(final_span).await { + error!("Failed to log final output span: {}", log_err); + } } } @@ -767,16 +738,11 @@ impl Agent { // Create a new span with the error output let error_span = tool_span.clone().with_output(error_info); - // Clone needed values for the spawned task - let error_span_clone = error_span.clone(); - let client_clone = client.clone(); - // Spawn a task for logging to ensure non-blocking behavior - tokio::spawn(async move { - if let Err(log_err) = client_clone.log_span(error_span_clone).await { - error!("Failed to log tool execution error span: {}", log_err); - } - }); + // Log span non-blockingly (client handles the background processing) + if let Err(log_err) = client.log_span(error_span).await { + error!("Failed to log tool execution error span: {}", log_err); + } } } let error_message = format!("Tool execution error: {:?}", e); @@ -801,16 +767,11 @@ impl Agent { // Now that we have the tool result, add it as output and log the span // This creates a span showing assistant message -> tool execution -> tool result let result_span = tool_span.clone().with_output(serde_json::to_value(&tool_message)?); - // Clone needed values for the spawned task - let result_span_clone = result_span.clone(); - let client_clone = client.clone(); - // Spawn a task for logging to ensure non-blocking behavior - tokio::spawn(async move { - if let Err(log_err) = client_clone.log_span(result_span_clone).await { - error!("Failed to log tool result span: {}", log_err); - } - }); + // Log span non-blockingly (client handles the background processing) + if let Err(log_err) = client.log_span(result_span).await { + error!("Failed to log tool result span: {}", log_err); + } } } } @@ -841,16 +802,10 @@ impl Agent { // Create a new span with the final message as output let final_span = parent_span.clone().with_output(serde_json::to_value(&final_message)?); - // Clone needed values for the spawned task - let final_span_clone = final_span.clone(); - let client_clone = client.clone(); - - // Spawn a task for logging to ensure non-blocking behavior - tokio::spawn(async move { - if let Err(log_err) = client_clone.log_span(final_span_clone).await { - error!("Failed to log final output span: {}", log_err); - } - }); + // Log span non-blockingly (client handles the background processing) + if let Err(log_err) = client.log_span(final_span).await { + error!("Failed to log final output span: {}", log_err); + } } } @@ -895,32 +850,23 @@ impl Agent { return Ok(()); } - // Don't even try to access the trace - just return immediately - // The real work is done asynchronously - - // Get the session ID and thread data here, before spawning a new task + // Get the session ID let session_id = self.session_id; - let thread = self.get_current_thread().await; - // Instead of directly using trace, we'll do everything in a separate task + // Create and log a completion span non-blockingly if let Some(client) = &*BRAINTRUST_CLIENT { - let client_clone = client.clone(); + // Create a new empty span for completion + let completion_span = client.create_span( + "Trace Completion", + "completion", + None, + None + ).with_metadata("chat_id", session_id.to_string()); - // Fire and forget - spawn a task to handle the trace finishing - // This won't block the main thread at all - tokio::spawn(async move { - // Create a new empty span for completion - let completion_span = client_clone.create_span( - "Trace Completion", - "completion", - None, - None - ).with_metadata("chat_id", session_id.to_string()); - - if let Err(e) = client_clone.log_span(completion_span).await { - error!("Failed to log completion span: {}", e); - } - }); + // Log span non-blockingly (client handles the background processing) + if let Err(e) = client.log_span(completion_span).await { + error!("Failed to log completion span: {}", e); + } } // Return immediately, without waiting for any logging operations diff --git a/api/libs/braintrust/src/client.rs b/api/libs/braintrust/src/client.rs index 473bbbdfb..406625b24 100644 --- a/api/libs/braintrust/src/client.rs +++ b/api/libs/braintrust/src/client.rs @@ -120,13 +120,11 @@ impl BraintrustClient { // Clone the sender to avoid awaiting on the send operation let log_sender = self.log_sender.clone(); - // Fire and forget - spawn a task that won't block the caller - tokio::spawn(async move { - if let Err(e) = log_sender.send(span).await { - // Just log the error and continue, don't propagate it to the caller - error!("Failed to queue span for logging: {}", e); - } - }); + // Fire and forget - handle internally without requiring caller to spawn + if let Err(e) = log_sender.send(span).await { + // Just log the error and continue, don't propagate it to the caller + error!("Failed to queue span for logging: {}", e); + } // Return immediately without awaiting the log operation Ok(()) diff --git a/api/libs/braintrust/src/trace.rs b/api/libs/braintrust/src/trace.rs index a592550c9..68256e2fe 100644 --- a/api/libs/braintrust/src/trace.rs +++ b/api/libs/braintrust/src/trace.rs @@ -32,6 +32,7 @@ impl TraceBuilder { } /// Immediately create and log a span, return it for further updates + /// Logging happens asynchronously in the background /// /// # Arguments /// * `name` - Name of the span @@ -42,11 +43,13 @@ impl TraceBuilder { pub async fn add_span(&self, name: &str, span_type: &str) -> Result { let span = Span::new(name, span_type, &self.root_span.span_id, Some(&self.root_span.span_id)); debug!("Adding span '{}' with ID: {} to trace", name, span.span_id()); + // Log span non-blockingly (client handles the background processing) self.client.log_span(span.clone()).await?; Ok(span) } /// Create a child span from a parent span + /// Logging happens asynchronously in the background /// /// # Arguments /// * `name` - Name of the span @@ -59,6 +62,7 @@ impl TraceBuilder { let span = Span::new(name, span_type, &self.root_span.span_id, Some(parent_span.span_id())); debug!("Adding child span '{}' with ID: {} to parent: {}", name, span.span_id(), parent_span.span_id()); + // Log span non-blockingly (client handles the background processing) self.client.log_span(span.clone()).await?; Ok(span) } @@ -74,12 +78,14 @@ impl TraceBuilder { } /// Finish and log the root span + /// Logging happens asynchronously in the background /// /// # Returns - /// Result indicating success or failure + /// Result indicating success or failure of queuing the span (always Ok) pub async fn finish(self) -> Result<()> { debug!("Finishing trace with root span ID: {}", self.root_span.span_id); let finished_root = self.root_span.set_output(serde_json::json!("Trace completed")); + // Log span non-blockingly (client handles the background processing) self.client.log_span(finished_root).await?; Ok(()) }