From 587dfccd78913c2900ba9aed36e84a480b26e3e5 Mon Sep 17 00:00:00 2001 From: marko-kraemer Date: Mon, 7 Jul 2025 00:32:25 +0200 Subject: [PATCH] lint --- backend/agent/run.py | 199 +++++++++++++++++++++++++------------------ 1 file changed, 114 insertions(+), 85 deletions(-) diff --git a/backend/agent/run.py b/backend/agent/run.py index 12838579..1190e30d 100644 --- a/backend/agent/run.py +++ b/backend/agent/run.py @@ -319,7 +319,8 @@ async def run_agent( data = latest_user_message.data[0]['content'] if isinstance(data, str): data = json.loads(data) - trace.update(input=data['content']) + if trace: + trace.update(input=data['content']) while continue_execution and iteration_count < max_iterations: iteration_count += 1 @@ -329,7 +330,8 @@ async def run_agent( can_run, message, subscription = await check_billing_status(client, account_id) if not can_run: error_msg = f"Billing limit reached: {message}" - trace.event(name="billing_limit_reached", level="ERROR", status_message=(f"{error_msg}")) + if trace: + trace.event(name="billing_limit_reached", level="ERROR", status_message=(f"{error_msg}")) # Yield a special message to indicate billing limit reached yield { "type": "status", @@ -343,7 +345,8 @@ async def run_agent( message_type = latest_message.data[0].get('type') if message_type == 'assistant': logger.info(f"Last message was from assistant, stopping execution") - trace.event(name="last_message_from_assistant", level="DEFAULT", status_message=(f"Last message was from assistant, stopping execution")) + if trace: + trace.event(name="last_message_from_assistant", level="DEFAULT", status_message=(f"Last message was from assistant, stopping execution")) continue_execution = False break @@ -383,7 +386,8 @@ async def run_agent( "format": "image/jpeg" } }) - trace.event(name="screenshot_url_added_to_temporary_message", level="DEFAULT", status_message=(f"Screenshot URL added to temporary message.")) + if trace: + trace.event(name="screenshot_url_added_to_temporary_message", level="DEFAULT", status_message=(f"Screenshot URL added to temporary message.")) elif screenshot_base64: # Fallback to base64 if URL not available temp_message_content_list.append({ @@ -392,17 +396,21 @@ async def run_agent( "url": f"data:image/jpeg;base64,{screenshot_base64}", } }) - trace.event(name="screenshot_base64_added_to_temporary_message", level="WARNING", status_message=(f"Screenshot base64 added to temporary message. Prefer screenshot_url if available.")) + if trace: + trace.event(name="screenshot_base64_added_to_temporary_message", level="WARNING", status_message=(f"Screenshot base64 added to temporary message. Prefer screenshot_url if available.")) else: logger.warning("Browser state found but no screenshot data.") - trace.event(name="browser_state_found_but_no_screenshot_data", level="WARNING", status_message=(f"Browser state found but no screenshot data.")) + if trace: + trace.event(name="browser_state_found_but_no_screenshot_data", level="WARNING", status_message=(f"Browser state found but no screenshot data.")) else: logger.warning("Model is Gemini, Anthropic, or OpenAI, so not adding screenshot to temporary message.") - trace.event(name="model_is_gemini_anthropic_or_openai", level="WARNING", status_message=(f"Model is Gemini, Anthropic, or OpenAI, so not adding screenshot to temporary message.")) + if trace: + trace.event(name="model_is_gemini_anthropic_or_openai", level="WARNING", status_message=(f"Model is Gemini, Anthropic, or OpenAI, so not adding screenshot to temporary message.")) except Exception as e: logger.error(f"Error parsing browser state: {e}") - trace.event(name="error_parsing_browser_state", level="ERROR", status_message=(f"{e}")) + if trace: + trace.event(name="error_parsing_browser_state", level="ERROR", status_message=(f"{e}")) # Get the latest image_context message (NEW) latest_image_context_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'image_context').order('created_at', desc=True).limit(1).execute() @@ -430,7 +438,8 @@ async def run_agent( await client.table('messages').delete().eq('message_id', latest_image_context_msg.data[0]["message_id"]).execute() except Exception as e: logger.error(f"Error parsing image context: {e}") - trace.event(name="error_parsing_image_context", level="ERROR", status_message=(f"{e}")) + if trace: + trace.event(name="error_parsing_image_context", level="ERROR", status_message=(f"{e}")) # If we have any content, construct the temporary_message if temp_message_content_list: @@ -449,7 +458,7 @@ async def run_agent( # Gemini 2.5 Pro has 64k max output tokens max_tokens = 64000 - generation = trace.generation(name="thread_manager.run_thread") + generation = trace.generation(name="thread_manager.run_thread") if trace else None try: # Make the LLM call and process the response response = await thread_manager.run_thread( @@ -480,7 +489,8 @@ async def run_agent( if isinstance(response, dict) and "status" in response and response["status"] == "error": logger.error(f"Error response from run_thread: {response.get('message', 'Unknown error')}") - trace.event(name="error_response_from_run_thread", level="ERROR", status_message=(f"{response.get('message', 'Unknown error')}")) + if trace: + trace.event(name="error_response_from_run_thread", level="ERROR", status_message=(f"{response.get('message', 'Unknown error')}")) yield response break @@ -490,97 +500,114 @@ async def run_agent( # Process the response error_detected = False + full_response = "" try: - full_response = "" - async for chunk in response: - # If we receive an error chunk, we should stop after this iteration - if isinstance(chunk, dict) and chunk.get('type') == 'status' and chunk.get('status') == 'error': - logger.error(f"Error chunk detected: {chunk.get('message', 'Unknown error')}") - trace.event(name="error_chunk_detected", level="ERROR", status_message=(f"{chunk.get('message', 'Unknown error')}")) - error_detected = True - yield chunk # Forward the error chunk - continue # Continue processing other chunks but don't break yet - - # Check for termination signal in status messages - if chunk.get('type') == 'status': - try: - # Parse the metadata to check for termination signal - metadata = chunk.get('metadata', {}) - if isinstance(metadata, str): - metadata = json.loads(metadata) - - if metadata.get('agent_should_terminate'): - agent_should_terminate = True - logger.info("Agent termination signal detected in status message") - trace.event(name="agent_termination_signal_detected", level="DEFAULT", status_message="Agent termination signal detected in status message") - - # Extract the tool name from the status content if available - content = chunk.get('content', {}) - if isinstance(content, str): - content = json.loads(content) - - if content.get('function_name'): - last_tool_call = content['function_name'] - elif content.get('xml_tag_name'): - last_tool_call = content['xml_tag_name'] - - except Exception as e: - logger.debug(f"Error parsing status message for termination check: {e}") + # Check if response is iterable (async generator) or a dict (error case) + if hasattr(response, '__aiter__') and not isinstance(response, dict): + async for chunk in response: + # If we receive an error chunk, we should stop after this iteration + if isinstance(chunk, dict) and chunk.get('type') == 'status' and chunk.get('status') == 'error': + logger.error(f"Error chunk detected: {chunk.get('message', 'Unknown error')}") + if trace: + trace.event(name="error_chunk_detected", level="ERROR", status_message=(f"{chunk.get('message', 'Unknown error')}")) + error_detected = True + yield chunk # Forward the error chunk + continue # Continue processing other chunks but don't break yet - # Check for XML versions like , , or in assistant content chunks - if chunk.get('type') == 'assistant' and 'content' in chunk: - try: - # The content field might be a JSON string or object - content = chunk.get('content', '{}') - if isinstance(content, str): - assistant_content_json = json.loads(content) - else: - assistant_content_json = content + # Check for termination signal in status messages + if chunk.get('type') == 'status': + try: + # Parse the metadata to check for termination signal + metadata = chunk.get('metadata', {}) + if isinstance(metadata, str): + metadata = json.loads(metadata) + + if metadata.get('agent_should_terminate'): + agent_should_terminate = True + logger.info("Agent termination signal detected in status message") + if trace: + trace.event(name="agent_termination_signal_detected", level="DEFAULT", status_message="Agent termination signal detected in status message") + + # Extract the tool name from the status content if available + content = chunk.get('content', {}) + if isinstance(content, str): + content = json.loads(content) + + if content.get('function_name'): + last_tool_call = content['function_name'] + elif content.get('xml_tag_name'): + last_tool_call = content['xml_tag_name'] + + except Exception as e: + logger.debug(f"Error parsing status message for termination check: {e}") + + # Check for XML versions like , , or in assistant content chunks + if chunk.get('type') == 'assistant' and 'content' in chunk: + try: + # The content field might be a JSON string or object + content = chunk.get('content', '{}') + if isinstance(content, str): + assistant_content_json = json.loads(content) + else: + assistant_content_json = content - # The actual text content is nested within - assistant_text = assistant_content_json.get('content', '') - full_response += assistant_text - if isinstance(assistant_text, str): - if '' in assistant_text or '' in assistant_text or '' in assistant_text: - if '' in assistant_text: - xml_tool = 'ask' - elif '' in assistant_text: - xml_tool = 'complete' - elif '' in assistant_text: - xml_tool = 'web-browser-takeover' + # The actual text content is nested within + assistant_text = assistant_content_json.get('content', '') + full_response += assistant_text + if isinstance(assistant_text, str): + if '' in assistant_text or '' in assistant_text or '' in assistant_text: + if '' in assistant_text: + xml_tool = 'ask' + elif '' in assistant_text: + xml_tool = 'complete' + elif '' in assistant_text: + xml_tool = 'web-browser-takeover' - last_tool_call = xml_tool - logger.info(f"Agent used XML tool: {xml_tool}") - trace.event(name="agent_used_xml_tool", level="DEFAULT", status_message=(f"Agent used XML tool: {xml_tool}")) - except json.JSONDecodeError: - # Handle cases where content might not be valid JSON - logger.warning(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}") - trace.event(name="warning_could_not_parse_assistant_content_json", level="WARNING", status_message=(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}")) - except Exception as e: - logger.error(f"Error processing assistant chunk: {e}") - trace.event(name="error_processing_assistant_chunk", level="ERROR", status_message=(f"Error processing assistant chunk: {e}")) + last_tool_call = xml_tool + logger.info(f"Agent used XML tool: {xml_tool}") + if trace: + trace.event(name="agent_used_xml_tool", level="DEFAULT", status_message=(f"Agent used XML tool: {xml_tool}")) + except json.JSONDecodeError: + # Handle cases where content might not be valid JSON + logger.warning(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}") + if trace: + trace.event(name="warning_could_not_parse_assistant_content_json", level="WARNING", status_message=(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}")) + except Exception as e: + logger.error(f"Error processing assistant chunk: {e}") + if trace: + trace.event(name="error_processing_assistant_chunk", level="ERROR", status_message=(f"Error processing assistant chunk: {e}")) - yield chunk + yield chunk + else: + # Response is not iterable, likely an error dict + logger.error(f"Response is not iterable: {response}") + error_detected = True # Check if we should stop based on the last tool call or error if error_detected: logger.info(f"Stopping due to error detected in response") - trace.event(name="stopping_due_to_error_detected_in_response", level="DEFAULT", status_message=(f"Stopping due to error detected in response")) - generation.end(output=full_response, status_message="error_detected", level="ERROR") + if trace: + trace.event(name="stopping_due_to_error_detected_in_response", level="DEFAULT", status_message=(f"Stopping due to error detected in response")) + if generation: + generation.end(output=full_response, status_message="error_detected", level="ERROR") break if agent_should_terminate or last_tool_call in ['ask', 'complete', 'web-browser-takeover']: logger.info(f"Agent decided to stop with tool: {last_tool_call}") - trace.event(name="agent_decided_to_stop_with_tool", level="DEFAULT", status_message=(f"Agent decided to stop with tool: {last_tool_call}")) - generation.end(output=full_response, status_message="agent_stopped") + if trace: + trace.event(name="agent_decided_to_stop_with_tool", level="DEFAULT", status_message=(f"Agent decided to stop with tool: {last_tool_call}")) + if generation: + generation.end(output=full_response, status_message="agent_stopped") continue_execution = False except Exception as e: # Just log the error and re-raise to stop all iterations error_msg = f"Error during response streaming: {str(e)}" logger.error(f"Error: {error_msg}") - trace.event(name="error_during_response_streaming", level="ERROR", status_message=(f"Error during response streaming: {str(e)}")) - generation.end(output=full_response, status_message=error_msg, level="ERROR") + if trace: + trace.event(name="error_during_response_streaming", level="ERROR", status_message=(f"Error during response streaming: {str(e)}")) + if generation: + generation.end(output=full_response, status_message=error_msg, level="ERROR") yield { "type": "status", "status": "error", @@ -593,7 +620,8 @@ async def run_agent( # Just log the error and re-raise to stop all iterations error_msg = f"Error running thread: {str(e)}" logger.error(f"Error: {error_msg}") - trace.event(name="error_running_thread", level="ERROR", status_message=(f"Error running thread: {str(e)}")) + if trace: + trace.event(name="error_running_thread", level="ERROR", status_message=(f"Error running thread: {str(e)}")) yield { "type": "status", "status": "error", @@ -601,6 +629,7 @@ async def run_agent( } # Stop execution immediately on any error break - generation.end(output=full_response) + if generation: + generation.end(output=full_response) langfuse.flush() \ No newline at end of file