fix(backend): enhance trace logging levels for error handling and execution status

This commit is contained in:
sharath 2025-05-22 08:36:58 +00:00
parent ff1670be90
commit 8255c507a5
No known key found for this signature in database
3 changed files with 13 additions and 18 deletions

View File

@ -99,11 +99,9 @@ async def run_agent(
iteration_count += 1 iteration_count += 1
logger.info(f"🔄 Running iteration {iteration_count} of {max_iterations}...") logger.info(f"🔄 Running iteration {iteration_count} of {max_iterations}...")
span = trace.span(name="billing_check")
# Billing check on each iteration - still needed within the iterations # Billing check on each iteration - still needed within the iterations
can_run, message, subscription = await check_billing_status(client, account_id) can_run, message, subscription = await check_billing_status(client, account_id)
if not can_run: if not can_run:
span.end(status_message="billing_limit_reached")
error_msg = f"Billing limit reached: {message}" error_msg = f"Billing limit reached: {message}"
# Yield a special message to indicate billing limit reached # Yield a special message to indicate billing limit reached
yield { yield {
@ -112,9 +110,6 @@ async def run_agent(
"message": error_msg "message": error_msg
} }
break break
span.end(status_message="billing_limit_not_reached")
span = trace.span(name="get_latest_message")
# Check if last message is from assistant using direct Supabase query # Check if last message is from assistant using direct Supabase query
latest_message = await client.table('messages').select('*').eq('thread_id', thread_id).in_('type', ['assistant', 'tool', 'user']).order('created_at', desc=True).limit(1).execute() latest_message = await client.table('messages').select('*').eq('thread_id', thread_id).in_('type', ['assistant', 'tool', 'user']).order('created_at', desc=True).limit(1).execute()
if latest_message.data and len(latest_message.data) > 0: if latest_message.data and len(latest_message.data) > 0:
@ -122,15 +117,12 @@ async def run_agent(
if message_type == 'assistant': if message_type == 'assistant':
logger.info(f"Last message was from assistant, stopping execution") logger.info(f"Last message was from assistant, stopping execution")
continue_execution = False continue_execution = False
span.end(status_message="last_message_from_assistant")
break break
span.end(status_message="last_message_not_from_assistant")
# ---- Temporary Message Handling (Browser State & Image Context) ---- # ---- Temporary Message Handling (Browser State & Image Context) ----
temporary_message = None temporary_message = None
temp_message_content_list = [] # List to hold text/image blocks temp_message_content_list = [] # List to hold text/image blocks
span = trace.span(name="get_latest_browser_state_message")
# Get the latest browser_state message # Get the latest browser_state message
latest_browser_state_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'browser_state').order('created_at', desc=True).limit(1).execute() latest_browser_state_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'browser_state').order('created_at', desc=True).limit(1).execute()
if latest_browser_state_msg.data and len(latest_browser_state_msg.data) > 0: if latest_browser_state_msg.data and len(latest_browser_state_msg.data) > 0:
@ -171,9 +163,7 @@ async def run_agent(
except Exception as e: except Exception as e:
logger.error(f"Error parsing browser state: {e}") logger.error(f"Error parsing browser state: {e}")
span.end(status_message="get_latest_browser_state_message")
span = trace.span(name="get_latest_image_context_message")
# Get the latest image_context message (NEW) # Get the latest image_context message (NEW)
latest_image_context_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'image_context').order('created_at', desc=True).limit(1).execute() latest_image_context_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'image_context').order('created_at', desc=True).limit(1).execute()
if latest_image_context_msg.data and len(latest_image_context_msg.data) > 0: if latest_image_context_msg.data and len(latest_image_context_msg.data) > 0:
@ -200,7 +190,6 @@ async def run_agent(
await client.table('messages').delete().eq('message_id', latest_image_context_msg.data[0]["message_id"]).execute() await client.table('messages').delete().eq('message_id', latest_image_context_msg.data[0]["message_id"]).execute()
except Exception as e: except Exception as e:
logger.error(f"Error parsing image context: {e}") logger.error(f"Error parsing image context: {e}")
span.end(status_message="get_latest_image_context_message")
# If we have any content, construct the temporary_message # If we have any content, construct the temporary_message
if temp_message_content_list: if temp_message_content_list:
@ -301,7 +290,7 @@ async def run_agent(
# Check if we should stop based on the last tool call or error # Check if we should stop based on the last tool call or error
if error_detected: if error_detected:
logger.info(f"Stopping due to error detected in response") logger.info(f"Stopping due to error detected in response")
generation.end(output=full_response, status_message="error_detected") generation.end(output=full_response, status_message="error_detected", level="ERROR")
break break
if last_tool_call in ['ask', 'complete', 'web-browser-takeover']: if last_tool_call in ['ask', 'complete', 'web-browser-takeover']:
@ -313,6 +302,7 @@ async def run_agent(
# Just log the error and re-raise to stop all iterations # Just log the error and re-raise to stop all iterations
error_msg = f"Error during response streaming: {str(e)}" error_msg = f"Error during response streaming: {str(e)}"
logger.error(f"Error: {error_msg}") logger.error(f"Error: {error_msg}")
generation.end(output=full_response, status_message=error_msg, level="ERROR")
yield { yield {
"type": "status", "type": "status",
"status": "error", "status": "error",

View File

@ -1062,7 +1062,9 @@ class ResponseProcessor:
# Tool execution methods # Tool execution methods
async def _execute_tool(self, tool_call: Dict[str, Any], trace: Optional[StatefulTraceClient] = None) -> ToolResult: async def _execute_tool(self, tool_call: Dict[str, Any], trace: Optional[StatefulTraceClient] = None) -> ToolResult:
"""Execute a single tool call and return the result.""" """Execute a single tool call and return the result."""
span = trace.span(name=f"execute_tool.{tool_call['function_name']}", input=tool_call["arguments"]) span = None
if trace:
span = trace.span(name=f"execute_tool.{tool_call['function_name']}", input=tool_call["arguments"])
try: try:
function_name = tool_call["function_name"] function_name = tool_call["function_name"]
arguments = tool_call["arguments"] arguments = tool_call["arguments"]
@ -1082,17 +1084,20 @@ class ResponseProcessor:
tool_fn = available_functions.get(function_name) tool_fn = available_functions.get(function_name)
if not tool_fn: if not tool_fn:
logger.error(f"Tool function '{function_name}' not found in registry") logger.error(f"Tool function '{function_name}' not found in registry")
span.end(status_message="tool_not_found") if span:
span.end(status_message="tool_not_found", level="ERROR")
return ToolResult(success=False, output=f"Tool function '{function_name}' not found") return ToolResult(success=False, output=f"Tool function '{function_name}' not found")
logger.debug(f"Found tool function for '{function_name}', executing...") logger.debug(f"Found tool function for '{function_name}', executing...")
result = await tool_fn(**arguments) result = await tool_fn(**arguments)
logger.info(f"Tool execution complete: {function_name} -> {result}") logger.info(f"Tool execution complete: {function_name} -> {result}")
span.end(status_message="tool_executed", output=result) if span:
span.end(status_message="tool_executed", output=result)
return result return result
except Exception as e: except Exception as e:
logger.error(f"Error executing tool {tool_call['function_name']}: {str(e)}", exc_info=True) logger.error(f"Error executing tool {tool_call['function_name']}: {str(e)}", exc_info=True)
span.end(status_message="tool_execution_error", output=f"Error executing tool: {str(e)}") if span:
span.end(status_message="tool_execution_error", output=f"Error executing tool: {str(e)}", level="ERROR")
return ToolResult(success=False, output=f"Error executing tool: {str(e)}") return ToolResult(success=False, output=f"Error executing tool: {str(e)}")
async def _execute_tools( async def _execute_tools(

View File

@ -127,7 +127,7 @@ async def run_agent_background(
if stop_signal_received: if stop_signal_received:
logger.info(f"Agent run {agent_run_id} stopped by signal.") logger.info(f"Agent run {agent_run_id} stopped by signal.")
final_status = "stopped" final_status = "stopped"
trace.span(name="agent_run_stopped").end(status_message="agent_run_stopped") trace.span(name="agent_run_stopped").end(status_message="agent_run_stopped", level="WARNING")
break break
# Store response in Redis list and publish notification # Store response in Redis list and publish notification
@ -178,7 +178,7 @@ async def run_agent_background(
duration = (datetime.now(timezone.utc) - start_time).total_seconds() duration = (datetime.now(timezone.utc) - start_time).total_seconds()
logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})") logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})")
final_status = "failed" final_status = "failed"
trace.span(name="agent_run_failed").end(status_message=error_message) trace.span(name="agent_run_failed").end(status_message=error_message, level="ERROR")
# Push error message to Redis list # Push error message to Redis list
error_response = {"type": "status", "status": "error", "message": error_message} error_response = {"type": "status", "status": "error", "message": error_message}