From ff1670be90573e369c0b79ba2f4e70702afce009 Mon Sep 17 00:00:00 2001 From: sharath <29162020+tnfssc@users.noreply.github.com> Date: Wed, 21 May 2025 00:39:28 +0000 Subject: [PATCH 1/6] feat(backend): langfuse traces --- backend/.env.example | 4 +++ backend/agent/run.py | 34 +++++++++++++++++-- backend/agentpress/response_processor.py | 15 ++++++--- backend/agentpress/thread_manager.py | 26 ++++++++++++-- backend/poetry.lock | 43 ++++++++++++++++++++++-- backend/pyproject.toml | 1 + backend/requirements.txt | 1 + backend/run_agent_background.py | 9 ++++- backend/services/langfuse.py | 12 +++++++ backend/utils/config.py | 5 +++ docker-compose.yaml | 4 ++- 11 files changed, 141 insertions(+), 13 deletions(-) create mode 100644 backend/services/langfuse.py diff --git a/backend/.env.example b/backend/.env.example index 4588ab77..3cbff524 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -43,3 +43,7 @@ FIRECRAWL_URL= DAYTONA_API_KEY= DAYTONA_SERVER_URL= DAYTONA_TARGET= + +LANGFUSE_PUBLIC_KEY="pk-REDACTED" +LANGFUSE_SECRET_KEY="sk-REDACTED" +LANGFUSE_HOST="https://cloud.langfuse.com" diff --git a/backend/agent/run.py b/backend/agent/run.py index 59fd5e01..bdd0f77e 100644 --- a/backend/agent/run.py +++ b/backend/agent/run.py @@ -23,6 +23,8 @@ from utils.logger import logger from utils.auth_utils import get_account_id_from_thread from services.billing import check_billing_status from agent.tools.sb_vision_tool import SandboxVisionTool +from services.langfuse import langfuse +from langfuse.client import StatefulTraceClient load_dotenv() @@ -36,7 +38,8 @@ async def run_agent( model_name: str = "anthropic/claude-3-7-sonnet-latest", enable_thinking: Optional[bool] = False, reasoning_effort: Optional[str] = 'low', - enable_context_manager: bool = True + enable_context_manager: bool = True, + trace: Optional[StatefulTraceClient] = None ): """Run the development agent with specified configuration.""" logger.info(f"🚀 Starting agent with model: {model_name}") @@ -55,6 +58,10 @@ async def run_agent( if not project.data or len(project.data) == 0: raise ValueError(f"Project {project_id} not found") + if not trace: + logger.warning("No trace provided, creating a new one") + trace = langfuse.trace(name="agent_run", id=thread_id, session_id=thread_id, metadata={"project_id": project_id}) + project_data = project.data[0] sandbox_info = project_data.get('sandbox', {}) if not sandbox_info.get('id'): @@ -92,9 +99,11 @@ async def run_agent( iteration_count += 1 logger.info(f"🔄 Running iteration {iteration_count} of {max_iterations}...") + span = trace.span(name="billing_check") # Billing check on each iteration - still needed within the iterations can_run, message, subscription = await check_billing_status(client, account_id) if not can_run: + span.end(status_message="billing_limit_reached") error_msg = f"Billing limit reached: {message}" # Yield a special message to indicate billing limit reached yield { @@ -103,6 +112,9 @@ async def run_agent( "message": error_msg } break + span.end(status_message="billing_limit_not_reached") + + span = trace.span(name="get_latest_message") # Check if last message is from assistant using direct Supabase query latest_message = await client.table('messages').select('*').eq('thread_id', thread_id).in_('type', ['assistant', 'tool', 'user']).order('created_at', desc=True).limit(1).execute() if latest_message.data and len(latest_message.data) > 0: @@ -110,12 +122,15 @@ async def run_agent( if message_type == 'assistant': logger.info(f"Last message was from assistant, stopping execution") continue_execution = False + span.end(status_message="last_message_from_assistant") break + span.end(status_message="last_message_not_from_assistant") # ---- Temporary Message Handling (Browser State & Image Context) ---- temporary_message = None temp_message_content_list = [] # List to hold text/image blocks + span = trace.span(name="get_latest_browser_state_message") # Get the latest browser_state message latest_browser_state_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'browser_state').order('created_at', desc=True).limit(1).execute() if latest_browser_state_msg.data and len(latest_browser_state_msg.data) > 0: @@ -156,7 +171,9 @@ async def run_agent( except Exception as e: logger.error(f"Error parsing browser state: {e}") + span.end(status_message="get_latest_browser_state_message") + span = trace.span(name="get_latest_image_context_message") # Get the latest image_context message (NEW) latest_image_context_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'image_context').order('created_at', desc=True).limit(1).execute() if latest_image_context_msg.data and len(latest_image_context_msg.data) > 0: @@ -183,6 +200,7 @@ async def run_agent( await client.table('messages').delete().eq('message_id', latest_image_context_msg.data[0]["message_id"]).execute() except Exception as e: logger.error(f"Error parsing image context: {e}") + span.end(status_message="get_latest_image_context_message") # If we have any content, construct the temporary_message if temp_message_content_list: @@ -197,6 +215,7 @@ async def run_agent( elif "gpt-4" in model_name.lower(): max_tokens = 4096 + generation = trace.generation(name="thread_manager.run_thread") try: # Make the LLM call and process the response response = await thread_manager.run_thread( @@ -221,7 +240,9 @@ async def run_agent( include_xml_examples=True, enable_thinking=enable_thinking, reasoning_effort=reasoning_effort, - enable_context_manager=enable_context_manager + enable_context_manager=enable_context_manager, + generation=generation, + trace=trace ) if isinstance(response, dict) and "status" in response and response["status"] == "error": @@ -235,6 +256,7 @@ async def run_agent( # Process the response error_detected = False try: + full_response = "" async for chunk in response: # If we receive an error chunk, we should stop after this iteration if isinstance(chunk, dict) and chunk.get('type') == 'status' and chunk.get('status') == 'error': @@ -255,6 +277,7 @@ async def run_agent( # The actual text content is nested within assistant_text = assistant_content_json.get('content', '') + full_response += assistant_text if isinstance(assistant_text, str): # Ensure it's a string # Check for the closing tags as they signal the end of the tool usage if '' in assistant_text or '' in assistant_text or '' in assistant_text: @@ -278,11 +301,14 @@ async def run_agent( # Check if we should stop based on the last tool call or error if error_detected: logger.info(f"Stopping due to error detected in response") + generation.end(output=full_response, status_message="error_detected") break if last_tool_call in ['ask', 'complete', 'web-browser-takeover']: logger.info(f"Agent decided to stop with tool: {last_tool_call}") + generation.end(output=full_response, status_message="agent_stopped") continue_execution = False + except Exception as e: # Just log the error and re-raise to stop all iterations error_msg = f"Error during response streaming: {str(e)}" @@ -306,6 +332,10 @@ async def run_agent( } # Stop execution immediately on any error break + generation.end(output=full_response) + + langfuse.flush() # Flush Langfuse events at the end of the run + # # TESTING diff --git a/backend/agentpress/response_processor.py b/backend/agentpress/response_processor.py index 58cdaf83..bcc33590 100644 --- a/backend/agentpress/response_processor.py +++ b/backend/agentpress/response_processor.py @@ -21,6 +21,7 @@ from litellm import completion_cost from agentpress.tool import Tool, ToolResult from agentpress.tool_registry import ToolRegistry from utils.logger import logger +from langfuse.client import StatefulTraceClient # Type alias for XML result adding strategy XmlAddingStrategy = Literal["user_message", "assistant_message", "inline_edit"] @@ -99,6 +100,7 @@ class ResponseProcessor: prompt_messages: List[Dict[str, Any]], llm_model: str, config: ProcessorConfig = ProcessorConfig(), + trace: Optional[StatefulTraceClient] = None, ) -> AsyncGenerator[Dict[str, Any], None]: """Process a streaming LLM response, handling tool calls and execution. @@ -209,7 +211,7 @@ class ResponseProcessor: if started_msg_obj: yield started_msg_obj yielded_tool_indices.add(tool_index) # Mark status as yielded - execution_task = asyncio.create_task(self._execute_tool(tool_call)) + execution_task = asyncio.create_task(self._execute_tool(tool_call, trace)) pending_tool_executions.append({ "task": execution_task, "tool_call": tool_call, "tool_index": tool_index, "context": context @@ -587,7 +589,8 @@ class ResponseProcessor: thread_id: str, prompt_messages: List[Dict[str, Any]], llm_model: str, - config: ProcessorConfig = ProcessorConfig() + config: ProcessorConfig = ProcessorConfig(), + trace: Optional[StatefulTraceClient] = None, ) -> AsyncGenerator[Dict[str, Any], None]: """Process a non-streaming LLM response, handling tool calls and execution. @@ -1057,12 +1060,13 @@ class ResponseProcessor: return parsed_data # Tool execution methods - async def _execute_tool(self, tool_call: Dict[str, Any]) -> ToolResult: + async def _execute_tool(self, tool_call: Dict[str, Any], trace: Optional[StatefulTraceClient] = None) -> ToolResult: """Execute a single tool call and return the result.""" + span = trace.span(name=f"execute_tool.{tool_call['function_name']}", input=tool_call["arguments"]) try: function_name = tool_call["function_name"] arguments = tool_call["arguments"] - + logger.info(f"Executing tool: {function_name} with arguments: {arguments}") if isinstance(arguments, str): @@ -1078,14 +1082,17 @@ class ResponseProcessor: tool_fn = available_functions.get(function_name) if not tool_fn: logger.error(f"Tool function '{function_name}' not found in registry") + span.end(status_message="tool_not_found") return ToolResult(success=False, output=f"Tool function '{function_name}' not found") logger.debug(f"Found tool function for '{function_name}', executing...") result = await tool_fn(**arguments) logger.info(f"Tool execution complete: {function_name} -> {result}") + span.end(status_message="tool_executed", output=result) return result except Exception as e: logger.error(f"Error executing tool {tool_call['function_name']}: {str(e)}", exc_info=True) + span.end(status_message="tool_execution_error", output=f"Error executing tool: {str(e)}") return ToolResult(success=False, output=f"Error executing tool: {str(e)}") async def _execute_tools( diff --git a/backend/agentpress/thread_manager.py b/backend/agentpress/thread_manager.py index be8b48a6..7aac40d6 100644 --- a/backend/agentpress/thread_manager.py +++ b/backend/agentpress/thread_manager.py @@ -22,6 +22,8 @@ from agentpress.response_processor import ( ) from services.supabase import DBConnection from utils.logger import logger +from langfuse.client import StatefulGenerationClient, StatefulTraceClient +import datetime # Type alias for tool choice ToolChoice = Literal["auto", "required", "none"] @@ -161,7 +163,9 @@ class ThreadManager: include_xml_examples: bool = False, enable_thinking: Optional[bool] = False, reasoning_effort: Optional[str] = 'low', - enable_context_manager: bool = True + enable_context_manager: bool = True, + generation: Optional[StatefulGenerationClient] = None, + trace: Optional[StatefulTraceClient] = None ) -> Union[Dict[str, Any], AsyncGenerator]: """Run a conversation thread with LLM integration and tool execution. @@ -322,6 +326,20 @@ Here are the XML tools available with examples: # 5. Make LLM API call logger.debug("Making LLM API call") try: + if generation: + generation.update( + input=prepared_messages, + start_time=datetime.datetime.now(datetime.timezone.utc), + model=llm_model, + model_parameters={ + "max_tokens": llm_max_tokens, + "temperature": llm_temperature, + "enable_thinking": enable_thinking, + "reasoning_effort": reasoning_effort, + "tool_choice": tool_choice, + "tools": openapi_tool_schemas, + } + ) llm_response = await make_llm_api_call( prepared_messages, # Pass the potentially modified messages llm_model, @@ -347,7 +365,8 @@ Here are the XML tools available with examples: thread_id=thread_id, config=processor_config, prompt_messages=prepared_messages, - llm_model=llm_model + llm_model=llm_model, + trace=trace ) return response_generator @@ -359,7 +378,8 @@ Here are the XML tools available with examples: thread_id=thread_id, config=processor_config, prompt_messages=prepared_messages, - llm_model=llm_model + llm_model=llm_model, + trace=trace ) return response_generator # Return the generator diff --git a/backend/poetry.lock b/backend/poetry.lock index 18bcaaf0..69613283 100644 --- a/backend/poetry.lock +++ b/backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -249,6 +249,18 @@ files = [ [package.extras] visualize = ["Twisted (>=16.1.1)", "graphviz (>0.5.1)"] +[[package]] +name = "backoff" +version = "2.2.1" +description = "Function decoration for backoff and retry" +optional = false +python-versions = ">=3.7,<4.0" +groups = ["main"] +files = [ + {file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"}, + {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, +] + [[package]] name = "boto3" version = "1.37.34" @@ -1217,6 +1229,33 @@ files = [ [package.dependencies] referencing = ">=0.31.0" +[[package]] +name = "langfuse" +version = "2.60.5" +description = "A client library for accessing langfuse" +optional = false +python-versions = "<4.0,>=3.9" +groups = ["main"] +files = [ + {file = "langfuse-2.60.5-py3-none-any.whl", hash = "sha256:fd27d52017f36d6fa5ca652615213a2535dc93dd88c3375eeb811af26384d285"}, + {file = "langfuse-2.60.5.tar.gz", hash = "sha256:a33ecddc98cf6d12289372e63071b77b72230e7bc8260ee349f1465d53bf425b"}, +] + +[package.dependencies] +anyio = ">=4.4.0,<5.0.0" +backoff = ">=1.10.0" +httpx = ">=0.15.4,<1.0" +idna = ">=3.7,<4.0" +packaging = ">=23.2,<25.0" +pydantic = ">=1.10.7,<3.0" +requests = ">=2,<3" +wrapt = ">=1.14,<2.0" + +[package.extras] +langchain = ["langchain (>=0.0.309)"] +llama-index = ["llama-index (>=0.10.12,<2.0.0)"] +openai = ["openai (>=0.27.8)"] + [[package]] name = "litellm" version = "1.66.1" @@ -3539,4 +3578,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "6163a36d6c3507a20552400544de78f7b48a92a98c8c68db7c98263465bf275a" +content-hash = "8bf5f2b60329678979d6eceb2c9860e92b9f2f68cad75651239fdacfc3964633" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 5f4c4143..af8e5872 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -52,6 +52,7 @@ stripe = "^12.0.1" dramatiq = "^1.17.1" pika = "^1.3.2" prometheus-client = "^0.21.1" +langfuse = "^2.60.5" [tool.poetry.scripts] agentpress = "agentpress.cli:main" diff --git a/backend/requirements.txt b/backend/requirements.txt index 33846135..a7e2dfde 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -33,3 +33,4 @@ tavily-python>=0.5.4 pytesseract==0.3.13 stripe>=7.0.0 dramatiq[rabbitmq]>=1.17.1 +langfuse>=2.60.5 diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index 927008c3..bf715f78 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -13,6 +13,7 @@ from services.supabase import DBConnection from services import redis from dramatiq.brokers.rabbitmq import RabbitmqBroker import os +from services.langfuse import langfuse rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq') rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672)) @@ -98,6 +99,7 @@ async def run_agent_background( logger.error(f"Error in stop signal checker for {agent_run_id}: {e}", exc_info=True) stop_signal_received = True # Stop the run if the checker fails + trace = langfuse.trace(name="agent_run", id=agent_run_id, session_id=thread_id, metadata={"project_id": project_id, "instance_id": instance_id}) try: # Setup Pub/Sub listener for control signals pubsub = await redis.create_pubsub() @@ -108,12 +110,14 @@ async def run_agent_background( # Ensure active run key exists and has TTL await redis.set(instance_active_key, "running", ex=redis.REDIS_KEY_TTL) + # Initialize agent generator agent_gen = run_agent( thread_id=thread_id, project_id=project_id, stream=stream, thread_manager=thread_manager, model_name=model_name, enable_thinking=enable_thinking, reasoning_effort=reasoning_effort, - enable_context_manager=enable_context_manager + enable_context_manager=enable_context_manager, + trace=trace ) final_status = "running" @@ -123,6 +127,7 @@ async def run_agent_background( if stop_signal_received: logger.info(f"Agent run {agent_run_id} stopped by signal.") final_status = "stopped" + trace.span(name="agent_run_stopped").end(status_message="agent_run_stopped") break # Store response in Redis list and publish notification @@ -147,6 +152,7 @@ async def run_agent_background( duration = (datetime.now(timezone.utc) - start_time).total_seconds() logger.info(f"Agent run {agent_run_id} completed normally (duration: {duration:.2f}s, responses: {total_responses})") completion_message = {"type": "status", "status": "completed", "message": "Agent run completed successfully"} + trace.span(name="agent_run_completed").end(status_message="agent_run_completed") await redis.rpush(response_list_key, json.dumps(completion_message)) await redis.publish(response_channel, "new") # Notify about the completion message @@ -172,6 +178,7 @@ async def run_agent_background( duration = (datetime.now(timezone.utc) - start_time).total_seconds() logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})") final_status = "failed" + trace.span(name="agent_run_failed").end(status_message=error_message) # Push error message to Redis list error_response = {"type": "status", "status": "error", "message": error_message} diff --git a/backend/services/langfuse.py b/backend/services/langfuse.py new file mode 100644 index 00000000..cf624bf3 --- /dev/null +++ b/backend/services/langfuse.py @@ -0,0 +1,12 @@ +import os +from langfuse import Langfuse + +public_key = os.getenv("LANGFUSE_PUBLIC_KEY") +secret_key = os.getenv("LANGFUSE_SECRET_KEY") +host = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com") + +enabled = False +if public_key and secret_key: + enabled = True + +langfuse = Langfuse(enabled=enabled) diff --git a/backend/utils/config.py b/backend/utils/config.py index 085cf041..119a852f 100644 --- a/backend/utils/config.py +++ b/backend/utils/config.py @@ -162,6 +162,11 @@ class Configuration: SANDBOX_IMAGE_NAME = "kortix/suna:0.1.2.8" SANDBOX_ENTRYPOINT = "/usr/bin/supervisord -n -c /etc/supervisor/conf.d/supervisord.conf" + # LangFuse configuration + LANGFUSE_PUBLIC_KEY: Optional[str] = None + LANGFUSE_SECRET_KEY: Optional[str] = None + LANGFUSE_HOST: str = "https://cloud.langfuse.com" + @property def STRIPE_PRODUCT_ID(self) -> str: if self.ENV_MODE == EnvMode.STAGING: diff --git a/docker-compose.yaml b/docker-compose.yaml index 039a0cdd..1ce7d844 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,6 +1,8 @@ services: redis: image: redis:7-alpine + ports: + - "6379:6379" volumes: - redis_data:/data - ./backend/services/docker/redis.conf:/usr/local/etc/redis/redis.conf:ro @@ -14,7 +16,7 @@ services: rabbitmq: image: rabbitmq ports: - - "5672:5672" + - "5672:5672" - "15672:15672" volumes: - rabbitmq_data:/var/lib/rabbitmq From 8255c507a51acf3a2ed795f27de4ad65b14ca81e Mon Sep 17 00:00:00 2001 From: sharath <29162020+tnfssc@users.noreply.github.com> Date: Thu, 22 May 2025 08:36:58 +0000 Subject: [PATCH 2/6] fix(backend): enhance trace logging levels for error handling and execution status --- backend/agent/run.py | 14 ++------------ backend/agentpress/response_processor.py | 13 +++++++++---- backend/run_agent_background.py | 4 ++-- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/backend/agent/run.py b/backend/agent/run.py index bdd0f77e..79120580 100644 --- a/backend/agent/run.py +++ b/backend/agent/run.py @@ -99,11 +99,9 @@ async def run_agent( iteration_count += 1 logger.info(f"🔄 Running iteration {iteration_count} of {max_iterations}...") - span = trace.span(name="billing_check") # Billing check on each iteration - still needed within the iterations can_run, message, subscription = await check_billing_status(client, account_id) if not can_run: - span.end(status_message="billing_limit_reached") error_msg = f"Billing limit reached: {message}" # Yield a special message to indicate billing limit reached yield { @@ -112,9 +110,6 @@ async def run_agent( "message": error_msg } break - span.end(status_message="billing_limit_not_reached") - - span = trace.span(name="get_latest_message") # Check if last message is from assistant using direct Supabase query latest_message = await client.table('messages').select('*').eq('thread_id', thread_id).in_('type', ['assistant', 'tool', 'user']).order('created_at', desc=True).limit(1).execute() if latest_message.data and len(latest_message.data) > 0: @@ -122,15 +117,12 @@ async def run_agent( if message_type == 'assistant': logger.info(f"Last message was from assistant, stopping execution") continue_execution = False - span.end(status_message="last_message_from_assistant") break - span.end(status_message="last_message_not_from_assistant") # ---- Temporary Message Handling (Browser State & Image Context) ---- temporary_message = None temp_message_content_list = [] # List to hold text/image blocks - span = trace.span(name="get_latest_browser_state_message") # Get the latest browser_state message latest_browser_state_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'browser_state').order('created_at', desc=True).limit(1).execute() if latest_browser_state_msg.data and len(latest_browser_state_msg.data) > 0: @@ -171,9 +163,7 @@ async def run_agent( except Exception as e: logger.error(f"Error parsing browser state: {e}") - span.end(status_message="get_latest_browser_state_message") - span = trace.span(name="get_latest_image_context_message") # Get the latest image_context message (NEW) latest_image_context_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'image_context').order('created_at', desc=True).limit(1).execute() if latest_image_context_msg.data and len(latest_image_context_msg.data) > 0: @@ -200,7 +190,6 @@ async def run_agent( await client.table('messages').delete().eq('message_id', latest_image_context_msg.data[0]["message_id"]).execute() except Exception as e: logger.error(f"Error parsing image context: {e}") - span.end(status_message="get_latest_image_context_message") # If we have any content, construct the temporary_message if temp_message_content_list: @@ -301,7 +290,7 @@ async def run_agent( # Check if we should stop based on the last tool call or error if error_detected: logger.info(f"Stopping due to error detected in response") - generation.end(output=full_response, status_message="error_detected") + generation.end(output=full_response, status_message="error_detected", level="ERROR") break if last_tool_call in ['ask', 'complete', 'web-browser-takeover']: @@ -313,6 +302,7 @@ async def run_agent( # Just log the error and re-raise to stop all iterations error_msg = f"Error during response streaming: {str(e)}" logger.error(f"Error: {error_msg}") + generation.end(output=full_response, status_message=error_msg, level="ERROR") yield { "type": "status", "status": "error", diff --git a/backend/agentpress/response_processor.py b/backend/agentpress/response_processor.py index bcc33590..42a3b9f3 100644 --- a/backend/agentpress/response_processor.py +++ b/backend/agentpress/response_processor.py @@ -1062,7 +1062,9 @@ class ResponseProcessor: # Tool execution methods async def _execute_tool(self, tool_call: Dict[str, Any], trace: Optional[StatefulTraceClient] = None) -> ToolResult: """Execute a single tool call and return the result.""" - span = trace.span(name=f"execute_tool.{tool_call['function_name']}", input=tool_call["arguments"]) + span = None + if trace: + span = trace.span(name=f"execute_tool.{tool_call['function_name']}", input=tool_call["arguments"]) try: function_name = tool_call["function_name"] arguments = tool_call["arguments"] @@ -1082,17 +1084,20 @@ class ResponseProcessor: tool_fn = available_functions.get(function_name) if not tool_fn: logger.error(f"Tool function '{function_name}' not found in registry") - span.end(status_message="tool_not_found") + if span: + span.end(status_message="tool_not_found", level="ERROR") return ToolResult(success=False, output=f"Tool function '{function_name}' not found") logger.debug(f"Found tool function for '{function_name}', executing...") result = await tool_fn(**arguments) logger.info(f"Tool execution complete: {function_name} -> {result}") - span.end(status_message="tool_executed", output=result) + if span: + span.end(status_message="tool_executed", output=result) return result except Exception as e: logger.error(f"Error executing tool {tool_call['function_name']}: {str(e)}", exc_info=True) - span.end(status_message="tool_execution_error", output=f"Error executing tool: {str(e)}") + if span: + span.end(status_message="tool_execution_error", output=f"Error executing tool: {str(e)}", level="ERROR") return ToolResult(success=False, output=f"Error executing tool: {str(e)}") async def _execute_tools( diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index bf715f78..4cb52302 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -127,7 +127,7 @@ async def run_agent_background( if stop_signal_received: logger.info(f"Agent run {agent_run_id} stopped by signal.") final_status = "stopped" - trace.span(name="agent_run_stopped").end(status_message="agent_run_stopped") + trace.span(name="agent_run_stopped").end(status_message="agent_run_stopped", level="WARNING") break # Store response in Redis list and publish notification @@ -178,7 +178,7 @@ async def run_agent_background( duration = (datetime.now(timezone.utc) - start_time).total_seconds() logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})") final_status = "failed" - trace.span(name="agent_run_failed").end(status_message=error_message) + trace.span(name="agent_run_failed").end(status_message=error_message, level="ERROR") # Push error message to Redis list error_response = {"type": "status", "status": "error", "message": error_message} From b647889a7d519cb777e612c3c0ccc4262c9c266c Mon Sep 17 00:00:00 2001 From: marko-kraemer Date: Fri, 23 May 2025 11:41:14 +0200 Subject: [PATCH 3/6] Refactor multi-select: replace icon with checkbox on hover for minimal selection UI --- .../src/components/sidebar/nav-agents.tsx | 193 ++++++++---------- 1 file changed, 84 insertions(+), 109 deletions(-) diff --git a/frontend/src/components/sidebar/nav-agents.tsx b/frontend/src/components/sidebar/nav-agents.tsx index cf3c689d..5356993b 100644 --- a/frontend/src/components/sidebar/nav-agents.tsx +++ b/frontend/src/components/sidebar/nav-agents.tsx @@ -62,7 +62,6 @@ export function NavAgents() { const isPerformingActionRef = useRef(false); const queryClient = useQueryClient(); - const [isMultiSelectActive, setIsMultiSelectActive] = useState(false); const [selectedThreads, setSelectedThreads] = useState>(new Set()); const [deleteProgress, setDeleteProgress] = useState(0); const [totalToDelete, setTotalToDelete] = useState(0); @@ -146,10 +145,9 @@ export function NavAgents() { // Function to handle thread click with loading state const handleThreadClick = (e: React.MouseEvent, threadId: string, url: string) => { - // If multi-select is active, prevent navigation and toggle selection - if (isMultiSelectActive) { + // If thread is selected, prevent navigation + if (selectedThreads.has(threadId)) { e.preventDefault(); - toggleThreadSelection(threadId); return; } @@ -159,7 +157,12 @@ export function NavAgents() { } // Toggle thread selection for multi-select - const toggleThreadSelection = (threadId: string) => { + const toggleThreadSelection = (threadId: string, e?: React.MouseEvent) => { + if (e) { + e.preventDefault(); + e.stopPropagation(); + } + setSelectedThreads(prev => { const newSelection = new Set(prev); if (newSelection.has(threadId)) { @@ -171,15 +174,6 @@ export function NavAgents() { }); }; - // Toggle multi-select mode - const toggleMultiSelect = () => { - setIsMultiSelectActive(!isMultiSelectActive); - // Clear selections when toggling off - if (isMultiSelectActive) { - setSelectedThreads(new Set()); - } - }; - // Select all threads const selectAllThreads = () => { const allThreadIds = combinedThreads.map(thread => thread.threadId); @@ -309,7 +303,6 @@ export function NavAgents() { // Reset states setSelectedThreads(new Set()); - setIsMultiSelectActive(false); setDeleteProgress(0); setTotalToDelete(0); }, @@ -331,7 +324,6 @@ export function NavAgents() { // Reset states setSelectedThreads(new Set()); - setIsMultiSelectActive(false); setThreadToDelete(null); isPerformingActionRef.current = false; setDeleteProgress(0); @@ -351,16 +343,15 @@ export function NavAgents() { return (
- Agents + Tasks {state !== 'collapsed' ? (
- {isMultiSelectActive ? ( + {selectedThreads.size > 0 ? ( <> - ) : ( - <> - - -
- -
-
- Select Multiple -
- - -
- - - New Agent - -
-
- New Agent -
- + + +
+ + + New Agent + +
+
+ New Agent +
)}
) : null} @@ -472,7 +433,7 @@ export function NavAgents() { const isSelected = selectedThreads.has(thread.threadId); return ( - + {state === 'collapsed' ? ( @@ -490,13 +451,7 @@ export function NavAgents() { handleThreadClick(e, thread.threadId, thread.url) } > - {isMultiSelectActive ? ( -
- {isSelected && } -
- ) : isThreadLoading ? ( + {isThreadLoading ? ( ) : ( @@ -509,48 +464,68 @@ export function NavAgents() { {thread.projectName}
) : ( - - - handleThreadClick(e, thread.threadId, thread.url) - } - className="flex items-center" +
+ - {isMultiSelectActive ? ( -
{ - e.preventDefault(); - e.stopPropagation(); - toggleThreadSelection(thread.threadId); - }} - > - {isSelected && } -
- ) : null} - {isThreadLoading ? ( - - ) : ( - - )} - {thread.projectName} - -
+ + handleThreadClick(e, thread.threadId, thread.url) + } + className="flex items-center" + > +
+ {/* Show checkbox on hover or when selected, otherwise show MessagesSquare */} + {isThreadLoading ? ( + + ) : ( + <> + {/* MessagesSquare icon - hidden on hover if not selected */} + + + {/* Checkbox - appears on hover or when selected */} +
toggleThreadSelection(thread.threadId, e)} + > +
+ {isSelected && } +
+
+ + )} +
+ {thread.projectName} + + +
)} - {state !== 'collapsed' && !isMultiSelectActive && ( + {state !== 'collapsed' && !isSelected && ( - + More From 2d16fdec6f1fdf1f28b29869076bb9dcfaf30741 Mon Sep 17 00:00:00 2001 From: sharath <29162020+tnfssc@users.noreply.github.com> Date: Fri, 23 May 2025 12:24:02 +0000 Subject: [PATCH 4/6] feat(billing): allow promocodes when purchasing --- backend/services/billing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/services/billing.py b/backend/services/billing.py index 62a1404b..74076745 100644 --- a/backend/services/billing.py +++ b/backend/services/billing.py @@ -553,7 +553,8 @@ async def create_checkout_session( metadata={ 'user_id': current_user_id, 'product_id': product_id - } + }, + allow_promotion_codes=True ) # Update customer status to potentially active (will be confirmed by webhook) From 857158a1804baab3895606419fb2b94af3d2e439 Mon Sep 17 00:00:00 2001 From: sharath <29162020+tnfssc@users.noreply.github.com> Date: Fri, 23 May 2025 12:45:14 +0000 Subject: [PATCH 5/6] fix(model): change to deepseek --- .../src/components/thread/chat-input/_use-model-selection.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/frontend/src/components/thread/chat-input/_use-model-selection.ts b/frontend/src/components/thread/chat-input/_use-model-selection.ts index 08d91abc..bc3bef4b 100644 --- a/frontend/src/components/thread/chat-input/_use-model-selection.ts +++ b/frontend/src/components/thread/chat-input/_use-model-selection.ts @@ -7,7 +7,7 @@ import { useAvailableModels } from '@/hooks/react-query/subscriptions/use-model' export const STORAGE_KEY_MODEL = 'suna-preferred-model'; export const STORAGE_KEY_CUSTOM_MODELS = 'customModels'; -export const DEFAULT_FREE_MODEL_ID = 'gemini-flash-2.5'; +export const DEFAULT_FREE_MODEL_ID = 'deepseek'; export const DEFAULT_PREMIUM_MODEL_ID = 'claude-sonnet-4'; export type SubscriptionStatus = 'no_subscription' | 'active'; @@ -269,7 +269,7 @@ export const useModelSelection = () => { models = [ { id: DEFAULT_FREE_MODEL_ID, - label: 'Gemini Flash 2.5', + label: 'DeepSeek', requiresSubscription: false, description: MODELS[DEFAULT_FREE_MODEL_ID]?.description || MODEL_TIERS.free.baseDescription, priority: MODELS[DEFAULT_FREE_MODEL_ID]?.priority || 50 From 20985f1f81616231da319747addbb3159e43f6f7 Mon Sep 17 00:00:00 2001 From: sharath <29162020+tnfssc@users.noreply.github.com> Date: Fri, 23 May 2025 13:44:29 +0000 Subject: [PATCH 6/6] fix(model): update model to improve performance --- .../20250523133848_admin-view-access.sql | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 backend/supabase/migrations/20250523133848_admin-view-access.sql diff --git a/backend/supabase/migrations/20250523133848_admin-view-access.sql b/backend/supabase/migrations/20250523133848_admin-view-access.sql new file mode 100644 index 00000000..64f62f49 --- /dev/null +++ b/backend/supabase/migrations/20250523133848_admin-view-access.sql @@ -0,0 +1,25 @@ +DROP POLICY IF EXISTS "Give read only access to internal users" ON threads; + +CREATE POLICY "Give read only access to internal users" ON threads +FOR SELECT +USING ( + ((auth.jwt() ->> 'email'::text) ~~ '%@kortix.ai'::text) +); + + +DROP POLICY IF EXISTS "Give read only access to internal users" ON messages; + +CREATE POLICY "Give read only access to internal users" ON messages +FOR SELECT +USING ( + ((auth.jwt() ->> 'email'::text) ~~ '%@kortix.ai'::text) +); + + +DROP POLICY IF EXISTS "Give read only access to internal users" ON projects; + +CREATE POLICY "Give read only access to internal users" ON projects +FOR SELECT +USING ( + ((auth.jwt() ->> 'email'::text) ~~ '%@kortix.ai'::text) +);