This commit is contained in:
marko-kraemer 2025-07-07 00:32:25 +02:00
parent c1c3f6cce1
commit 587dfccd78
1 changed files with 114 additions and 85 deletions

View File

@ -319,7 +319,8 @@ async def run_agent(
data = latest_user_message.data[0]['content']
if isinstance(data, str):
data = json.loads(data)
trace.update(input=data['content'])
if trace:
trace.update(input=data['content'])
while continue_execution and iteration_count < max_iterations:
iteration_count += 1
@ -329,7 +330,8 @@ async def run_agent(
can_run, message, subscription = await check_billing_status(client, account_id)
if not can_run:
error_msg = f"Billing limit reached: {message}"
trace.event(name="billing_limit_reached", level="ERROR", status_message=(f"{error_msg}"))
if trace:
trace.event(name="billing_limit_reached", level="ERROR", status_message=(f"{error_msg}"))
# Yield a special message to indicate billing limit reached
yield {
"type": "status",
@ -343,7 +345,8 @@ async def run_agent(
message_type = latest_message.data[0].get('type')
if message_type == 'assistant':
logger.info(f"Last message was from assistant, stopping execution")
trace.event(name="last_message_from_assistant", level="DEFAULT", status_message=(f"Last message was from assistant, stopping execution"))
if trace:
trace.event(name="last_message_from_assistant", level="DEFAULT", status_message=(f"Last message was from assistant, stopping execution"))
continue_execution = False
break
@ -383,7 +386,8 @@ async def run_agent(
"format": "image/jpeg"
}
})
trace.event(name="screenshot_url_added_to_temporary_message", level="DEFAULT", status_message=(f"Screenshot URL added to temporary message."))
if trace:
trace.event(name="screenshot_url_added_to_temporary_message", level="DEFAULT", status_message=(f"Screenshot URL added to temporary message."))
elif screenshot_base64:
# Fallback to base64 if URL not available
temp_message_content_list.append({
@ -392,17 +396,21 @@ async def run_agent(
"url": f"data:image/jpeg;base64,{screenshot_base64}",
}
})
trace.event(name="screenshot_base64_added_to_temporary_message", level="WARNING", status_message=(f"Screenshot base64 added to temporary message. Prefer screenshot_url if available."))
if trace:
trace.event(name="screenshot_base64_added_to_temporary_message", level="WARNING", status_message=(f"Screenshot base64 added to temporary message. Prefer screenshot_url if available."))
else:
logger.warning("Browser state found but no screenshot data.")
trace.event(name="browser_state_found_but_no_screenshot_data", level="WARNING", status_message=(f"Browser state found but no screenshot data."))
if trace:
trace.event(name="browser_state_found_but_no_screenshot_data", level="WARNING", status_message=(f"Browser state found but no screenshot data."))
else:
logger.warning("Model is Gemini, Anthropic, or OpenAI, so not adding screenshot to temporary message.")
trace.event(name="model_is_gemini_anthropic_or_openai", level="WARNING", status_message=(f"Model is Gemini, Anthropic, or OpenAI, so not adding screenshot to temporary message."))
if trace:
trace.event(name="model_is_gemini_anthropic_or_openai", level="WARNING", status_message=(f"Model is Gemini, Anthropic, or OpenAI, so not adding screenshot to temporary message."))
except Exception as e:
logger.error(f"Error parsing browser state: {e}")
trace.event(name="error_parsing_browser_state", level="ERROR", status_message=(f"{e}"))
if trace:
trace.event(name="error_parsing_browser_state", level="ERROR", status_message=(f"{e}"))
# Get the latest image_context message (NEW)
latest_image_context_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'image_context').order('created_at', desc=True).limit(1).execute()
@ -430,7 +438,8 @@ async def run_agent(
await client.table('messages').delete().eq('message_id', latest_image_context_msg.data[0]["message_id"]).execute()
except Exception as e:
logger.error(f"Error parsing image context: {e}")
trace.event(name="error_parsing_image_context", level="ERROR", status_message=(f"{e}"))
if trace:
trace.event(name="error_parsing_image_context", level="ERROR", status_message=(f"{e}"))
# If we have any content, construct the temporary_message
if temp_message_content_list:
@ -449,7 +458,7 @@ async def run_agent(
# Gemini 2.5 Pro has 64k max output tokens
max_tokens = 64000
generation = trace.generation(name="thread_manager.run_thread")
generation = trace.generation(name="thread_manager.run_thread") if trace else None
try:
# Make the LLM call and process the response
response = await thread_manager.run_thread(
@ -480,7 +489,8 @@ async def run_agent(
if isinstance(response, dict) and "status" in response and response["status"] == "error":
logger.error(f"Error response from run_thread: {response.get('message', 'Unknown error')}")
trace.event(name="error_response_from_run_thread", level="ERROR", status_message=(f"{response.get('message', 'Unknown error')}"))
if trace:
trace.event(name="error_response_from_run_thread", level="ERROR", status_message=(f"{response.get('message', 'Unknown error')}"))
yield response
break
@ -490,97 +500,114 @@ async def run_agent(
# Process the response
error_detected = False
full_response = ""
try:
full_response = ""
async for chunk in response:
# If we receive an error chunk, we should stop after this iteration
if isinstance(chunk, dict) and chunk.get('type') == 'status' and chunk.get('status') == 'error':
logger.error(f"Error chunk detected: {chunk.get('message', 'Unknown error')}")
trace.event(name="error_chunk_detected", level="ERROR", status_message=(f"{chunk.get('message', 'Unknown error')}"))
error_detected = True
yield chunk # Forward the error chunk
continue # Continue processing other chunks but don't break yet
# Check for termination signal in status messages
if chunk.get('type') == 'status':
try:
# Parse the metadata to check for termination signal
metadata = chunk.get('metadata', {})
if isinstance(metadata, str):
metadata = json.loads(metadata)
if metadata.get('agent_should_terminate'):
agent_should_terminate = True
logger.info("Agent termination signal detected in status message")
trace.event(name="agent_termination_signal_detected", level="DEFAULT", status_message="Agent termination signal detected in status message")
# Extract the tool name from the status content if available
content = chunk.get('content', {})
if isinstance(content, str):
content = json.loads(content)
if content.get('function_name'):
last_tool_call = content['function_name']
elif content.get('xml_tag_name'):
last_tool_call = content['xml_tag_name']
except Exception as e:
logger.debug(f"Error parsing status message for termination check: {e}")
# Check if response is iterable (async generator) or a dict (error case)
if hasattr(response, '__aiter__') and not isinstance(response, dict):
async for chunk in response:
# If we receive an error chunk, we should stop after this iteration
if isinstance(chunk, dict) and chunk.get('type') == 'status' and chunk.get('status') == 'error':
logger.error(f"Error chunk detected: {chunk.get('message', 'Unknown error')}")
if trace:
trace.event(name="error_chunk_detected", level="ERROR", status_message=(f"{chunk.get('message', 'Unknown error')}"))
error_detected = True
yield chunk # Forward the error chunk
continue # Continue processing other chunks but don't break yet
# Check for XML versions like <ask>, <complete>, or <web-browser-takeover> in assistant content chunks
if chunk.get('type') == 'assistant' and 'content' in chunk:
try:
# The content field might be a JSON string or object
content = chunk.get('content', '{}')
if isinstance(content, str):
assistant_content_json = json.loads(content)
else:
assistant_content_json = content
# Check for termination signal in status messages
if chunk.get('type') == 'status':
try:
# Parse the metadata to check for termination signal
metadata = chunk.get('metadata', {})
if isinstance(metadata, str):
metadata = json.loads(metadata)
if metadata.get('agent_should_terminate'):
agent_should_terminate = True
logger.info("Agent termination signal detected in status message")
if trace:
trace.event(name="agent_termination_signal_detected", level="DEFAULT", status_message="Agent termination signal detected in status message")
# Extract the tool name from the status content if available
content = chunk.get('content', {})
if isinstance(content, str):
content = json.loads(content)
if content.get('function_name'):
last_tool_call = content['function_name']
elif content.get('xml_tag_name'):
last_tool_call = content['xml_tag_name']
except Exception as e:
logger.debug(f"Error parsing status message for termination check: {e}")
# Check for XML versions like <ask>, <complete>, or <web-browser-takeover> in assistant content chunks
if chunk.get('type') == 'assistant' and 'content' in chunk:
try:
# The content field might be a JSON string or object
content = chunk.get('content', '{}')
if isinstance(content, str):
assistant_content_json = json.loads(content)
else:
assistant_content_json = content
# The actual text content is nested within
assistant_text = assistant_content_json.get('content', '')
full_response += assistant_text
if isinstance(assistant_text, str):
if '</ask>' in assistant_text or '</complete>' in assistant_text or '</web-browser-takeover>' in assistant_text:
if '</ask>' in assistant_text:
xml_tool = 'ask'
elif '</complete>' in assistant_text:
xml_tool = 'complete'
elif '</web-browser-takeover>' in assistant_text:
xml_tool = 'web-browser-takeover'
# The actual text content is nested within
assistant_text = assistant_content_json.get('content', '')
full_response += assistant_text
if isinstance(assistant_text, str):
if '</ask>' in assistant_text or '</complete>' in assistant_text or '</web-browser-takeover>' in assistant_text:
if '</ask>' in assistant_text:
xml_tool = 'ask'
elif '</complete>' in assistant_text:
xml_tool = 'complete'
elif '</web-browser-takeover>' in assistant_text:
xml_tool = 'web-browser-takeover'
last_tool_call = xml_tool
logger.info(f"Agent used XML tool: {xml_tool}")
trace.event(name="agent_used_xml_tool", level="DEFAULT", status_message=(f"Agent used XML tool: {xml_tool}"))
except json.JSONDecodeError:
# Handle cases where content might not be valid JSON
logger.warning(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}")
trace.event(name="warning_could_not_parse_assistant_content_json", level="WARNING", status_message=(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}"))
except Exception as e:
logger.error(f"Error processing assistant chunk: {e}")
trace.event(name="error_processing_assistant_chunk", level="ERROR", status_message=(f"Error processing assistant chunk: {e}"))
last_tool_call = xml_tool
logger.info(f"Agent used XML tool: {xml_tool}")
if trace:
trace.event(name="agent_used_xml_tool", level="DEFAULT", status_message=(f"Agent used XML tool: {xml_tool}"))
except json.JSONDecodeError:
# Handle cases where content might not be valid JSON
logger.warning(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}")
if trace:
trace.event(name="warning_could_not_parse_assistant_content_json", level="WARNING", status_message=(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}"))
except Exception as e:
logger.error(f"Error processing assistant chunk: {e}")
if trace:
trace.event(name="error_processing_assistant_chunk", level="ERROR", status_message=(f"Error processing assistant chunk: {e}"))
yield chunk
yield chunk
else:
# Response is not iterable, likely an error dict
logger.error(f"Response is not iterable: {response}")
error_detected = True
# Check if we should stop based on the last tool call or error
if error_detected:
logger.info(f"Stopping due to error detected in response")
trace.event(name="stopping_due_to_error_detected_in_response", level="DEFAULT", status_message=(f"Stopping due to error detected in response"))
generation.end(output=full_response, status_message="error_detected", level="ERROR")
if trace:
trace.event(name="stopping_due_to_error_detected_in_response", level="DEFAULT", status_message=(f"Stopping due to error detected in response"))
if generation:
generation.end(output=full_response, status_message="error_detected", level="ERROR")
break
if agent_should_terminate or last_tool_call in ['ask', 'complete', 'web-browser-takeover']:
logger.info(f"Agent decided to stop with tool: {last_tool_call}")
trace.event(name="agent_decided_to_stop_with_tool", level="DEFAULT", status_message=(f"Agent decided to stop with tool: {last_tool_call}"))
generation.end(output=full_response, status_message="agent_stopped")
if trace:
trace.event(name="agent_decided_to_stop_with_tool", level="DEFAULT", status_message=(f"Agent decided to stop with tool: {last_tool_call}"))
if generation:
generation.end(output=full_response, status_message="agent_stopped")
continue_execution = False
except Exception as e:
# Just log the error and re-raise to stop all iterations
error_msg = f"Error during response streaming: {str(e)}"
logger.error(f"Error: {error_msg}")
trace.event(name="error_during_response_streaming", level="ERROR", status_message=(f"Error during response streaming: {str(e)}"))
generation.end(output=full_response, status_message=error_msg, level="ERROR")
if trace:
trace.event(name="error_during_response_streaming", level="ERROR", status_message=(f"Error during response streaming: {str(e)}"))
if generation:
generation.end(output=full_response, status_message=error_msg, level="ERROR")
yield {
"type": "status",
"status": "error",
@ -593,7 +620,8 @@ async def run_agent(
# Just log the error and re-raise to stop all iterations
error_msg = f"Error running thread: {str(e)}"
logger.error(f"Error: {error_msg}")
trace.event(name="error_running_thread", level="ERROR", status_message=(f"Error running thread: {str(e)}"))
if trace:
trace.event(name="error_running_thread", level="ERROR", status_message=(f"Error running thread: {str(e)}"))
yield {
"type": "status",
"status": "error",
@ -601,6 +629,7 @@ async def run_agent(
}
# Stop execution immediately on any error
break
generation.end(output=full_response)
if generation:
generation.end(output=full_response)
langfuse.flush()