diff --git a/backend/agent/run.py b/backend/agent/run.py index bdd0f77e..79120580 100644 --- a/backend/agent/run.py +++ b/backend/agent/run.py @@ -99,11 +99,9 @@ async def run_agent( iteration_count += 1 logger.info(f"🔄 Running iteration {iteration_count} of {max_iterations}...") - span = trace.span(name="billing_check") # Billing check on each iteration - still needed within the iterations can_run, message, subscription = await check_billing_status(client, account_id) if not can_run: - span.end(status_message="billing_limit_reached") error_msg = f"Billing limit reached: {message}" # Yield a special message to indicate billing limit reached yield { @@ -112,9 +110,6 @@ async def run_agent( "message": error_msg } break - span.end(status_message="billing_limit_not_reached") - - span = trace.span(name="get_latest_message") # Check if last message is from assistant using direct Supabase query latest_message = await client.table('messages').select('*').eq('thread_id', thread_id).in_('type', ['assistant', 'tool', 'user']).order('created_at', desc=True).limit(1).execute() if latest_message.data and len(latest_message.data) > 0: @@ -122,15 +117,12 @@ async def run_agent( if message_type == 'assistant': logger.info(f"Last message was from assistant, stopping execution") continue_execution = False - span.end(status_message="last_message_from_assistant") break - span.end(status_message="last_message_not_from_assistant") # ---- Temporary Message Handling (Browser State & Image Context) ---- temporary_message = None temp_message_content_list = [] # List to hold text/image blocks - span = trace.span(name="get_latest_browser_state_message") # Get the latest browser_state message latest_browser_state_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'browser_state').order('created_at', desc=True).limit(1).execute() if latest_browser_state_msg.data and len(latest_browser_state_msg.data) > 0: @@ -171,9 +163,7 @@ async def run_agent( except Exception as e: logger.error(f"Error parsing browser state: {e}") - span.end(status_message="get_latest_browser_state_message") - span = trace.span(name="get_latest_image_context_message") # Get the latest image_context message (NEW) latest_image_context_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'image_context').order('created_at', desc=True).limit(1).execute() if latest_image_context_msg.data and len(latest_image_context_msg.data) > 0: @@ -200,7 +190,6 @@ async def run_agent( await client.table('messages').delete().eq('message_id', latest_image_context_msg.data[0]["message_id"]).execute() except Exception as e: logger.error(f"Error parsing image context: {e}") - span.end(status_message="get_latest_image_context_message") # If we have any content, construct the temporary_message if temp_message_content_list: @@ -301,7 +290,7 @@ async def run_agent( # Check if we should stop based on the last tool call or error if error_detected: logger.info(f"Stopping due to error detected in response") - generation.end(output=full_response, status_message="error_detected") + generation.end(output=full_response, status_message="error_detected", level="ERROR") break if last_tool_call in ['ask', 'complete', 'web-browser-takeover']: @@ -313,6 +302,7 @@ async def run_agent( # Just log the error and re-raise to stop all iterations error_msg = f"Error during response streaming: {str(e)}" logger.error(f"Error: {error_msg}") + generation.end(output=full_response, status_message=error_msg, level="ERROR") yield { "type": "status", "status": "error", diff --git a/backend/agentpress/response_processor.py b/backend/agentpress/response_processor.py index bcc33590..42a3b9f3 100644 --- a/backend/agentpress/response_processor.py +++ b/backend/agentpress/response_processor.py @@ -1062,7 +1062,9 @@ class ResponseProcessor: # Tool execution methods async def _execute_tool(self, tool_call: Dict[str, Any], trace: Optional[StatefulTraceClient] = None) -> ToolResult: """Execute a single tool call and return the result.""" - span = trace.span(name=f"execute_tool.{tool_call['function_name']}", input=tool_call["arguments"]) + span = None + if trace: + span = trace.span(name=f"execute_tool.{tool_call['function_name']}", input=tool_call["arguments"]) try: function_name = tool_call["function_name"] arguments = tool_call["arguments"] @@ -1082,17 +1084,20 @@ class ResponseProcessor: tool_fn = available_functions.get(function_name) if not tool_fn: logger.error(f"Tool function '{function_name}' not found in registry") - span.end(status_message="tool_not_found") + if span: + span.end(status_message="tool_not_found", level="ERROR") return ToolResult(success=False, output=f"Tool function '{function_name}' not found") logger.debug(f"Found tool function for '{function_name}', executing...") result = await tool_fn(**arguments) logger.info(f"Tool execution complete: {function_name} -> {result}") - span.end(status_message="tool_executed", output=result) + if span: + span.end(status_message="tool_executed", output=result) return result except Exception as e: logger.error(f"Error executing tool {tool_call['function_name']}: {str(e)}", exc_info=True) - span.end(status_message="tool_execution_error", output=f"Error executing tool: {str(e)}") + if span: + span.end(status_message="tool_execution_error", output=f"Error executing tool: {str(e)}", level="ERROR") return ToolResult(success=False, output=f"Error executing tool: {str(e)}") async def _execute_tools( diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index bf715f78..4cb52302 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -127,7 +127,7 @@ async def run_agent_background( if stop_signal_received: logger.info(f"Agent run {agent_run_id} stopped by signal.") final_status = "stopped" - trace.span(name="agent_run_stopped").end(status_message="agent_run_stopped") + trace.span(name="agent_run_stopped").end(status_message="agent_run_stopped", level="WARNING") break # Store response in Redis list and publish notification @@ -178,7 +178,7 @@ async def run_agent_background( duration = (datetime.now(timezone.utc) - start_time).total_seconds() logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})") final_status = "failed" - trace.span(name="agent_run_failed").end(status_message=error_message) + trace.span(name="agent_run_failed").end(status_message=error_message, level="ERROR") # Push error message to Redis list error_response = {"type": "status", "status": "error", "message": error_message}