diff --git a/backend/.env.example b/backend/.env.example index 4588ab77..3cbff524 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -43,3 +43,7 @@ FIRECRAWL_URL= DAYTONA_API_KEY= DAYTONA_SERVER_URL= DAYTONA_TARGET= + +LANGFUSE_PUBLIC_KEY="pk-REDACTED" +LANGFUSE_SECRET_KEY="sk-REDACTED" +LANGFUSE_HOST="https://cloud.langfuse.com" diff --git a/backend/agent/run.py b/backend/agent/run.py index 2f5d2991..bda5b147 100644 --- a/backend/agent/run.py +++ b/backend/agent/run.py @@ -23,7 +23,10 @@ from utils.logger import logger from utils.auth_utils import get_account_id_from_thread from services.billing import check_billing_status from agent.tools.sb_vision_tool import SandboxVisionTool +from services.langfuse import langfuse +from langfuse.client import StatefulTraceClient from agent.gemini_prompt import get_gemini_system_prompt + load_dotenv() async def run_agent( @@ -36,7 +39,8 @@ async def run_agent( model_name: str = "anthropic/claude-3-7-sonnet-latest", enable_thinking: Optional[bool] = False, reasoning_effort: Optional[str] = 'low', - enable_context_manager: bool = True + enable_context_manager: bool = True, + trace: Optional[StatefulTraceClient] = None ): """Run the development agent with specified configuration.""" logger.info(f"🚀 Starting agent with model: {model_name}") @@ -55,6 +59,10 @@ async def run_agent( if not project.data or len(project.data) == 0: raise ValueError(f"Project {project_id} not found") + if not trace: + logger.warning("No trace provided, creating a new one") + trace = langfuse.trace(name="agent_run", id=thread_id, session_id=thread_id, metadata={"project_id": project_id}) + project_data = project.data[0] sandbox_info = project_data.get('sandbox', {}) if not sandbox_info.get('id'): @@ -199,6 +207,7 @@ async def run_agent( elif "gpt-4" in model_name.lower(): max_tokens = 4096 + generation = trace.generation(name="thread_manager.run_thread") try: # Make the LLM call and process the response response = await thread_manager.run_thread( @@ -223,7 +232,9 @@ async def run_agent( include_xml_examples=True, enable_thinking=enable_thinking, reasoning_effort=reasoning_effort, - enable_context_manager=enable_context_manager + enable_context_manager=enable_context_manager, + generation=generation, + trace=trace ) if isinstance(response, dict) and "status" in response and response["status"] == "error": @@ -237,6 +248,7 @@ async def run_agent( # Process the response error_detected = False try: + full_response = "" async for chunk in response: # If we receive an error chunk, we should stop after this iteration if isinstance(chunk, dict) and chunk.get('type') == 'status' and chunk.get('status') == 'error': @@ -257,6 +269,7 @@ async def run_agent( # The actual text content is nested within assistant_text = assistant_content_json.get('content', '') + full_response += assistant_text if isinstance(assistant_text, str): # Ensure it's a string # Check for the closing tags as they signal the end of the tool usage if '' in assistant_text or '' in assistant_text or '' in assistant_text: @@ -280,15 +293,19 @@ async def run_agent( # Check if we should stop based on the last tool call or error if error_detected: logger.info(f"Stopping due to error detected in response") + generation.end(output=full_response, status_message="error_detected", level="ERROR") break if last_tool_call in ['ask', 'complete', 'web-browser-takeover']: logger.info(f"Agent decided to stop with tool: {last_tool_call}") + generation.end(output=full_response, status_message="agent_stopped") continue_execution = False + except Exception as e: # Just log the error and re-raise to stop all iterations error_msg = f"Error during response streaming: {str(e)}" logger.error(f"Error: {error_msg}") + generation.end(output=full_response, status_message=error_msg, level="ERROR") yield { "type": "status", "status": "error", @@ -308,6 +325,10 @@ async def run_agent( } # Stop execution immediately on any error break + generation.end(output=full_response) + + langfuse.flush() # Flush Langfuse events at the end of the run + # # TESTING diff --git a/backend/agentpress/response_processor.py b/backend/agentpress/response_processor.py index 58cdaf83..42a3b9f3 100644 --- a/backend/agentpress/response_processor.py +++ b/backend/agentpress/response_processor.py @@ -21,6 +21,7 @@ from litellm import completion_cost from agentpress.tool import Tool, ToolResult from agentpress.tool_registry import ToolRegistry from utils.logger import logger +from langfuse.client import StatefulTraceClient # Type alias for XML result adding strategy XmlAddingStrategy = Literal["user_message", "assistant_message", "inline_edit"] @@ -99,6 +100,7 @@ class ResponseProcessor: prompt_messages: List[Dict[str, Any]], llm_model: str, config: ProcessorConfig = ProcessorConfig(), + trace: Optional[StatefulTraceClient] = None, ) -> AsyncGenerator[Dict[str, Any], None]: """Process a streaming LLM response, handling tool calls and execution. @@ -209,7 +211,7 @@ class ResponseProcessor: if started_msg_obj: yield started_msg_obj yielded_tool_indices.add(tool_index) # Mark status as yielded - execution_task = asyncio.create_task(self._execute_tool(tool_call)) + execution_task = asyncio.create_task(self._execute_tool(tool_call, trace)) pending_tool_executions.append({ "task": execution_task, "tool_call": tool_call, "tool_index": tool_index, "context": context @@ -587,7 +589,8 @@ class ResponseProcessor: thread_id: str, prompt_messages: List[Dict[str, Any]], llm_model: str, - config: ProcessorConfig = ProcessorConfig() + config: ProcessorConfig = ProcessorConfig(), + trace: Optional[StatefulTraceClient] = None, ) -> AsyncGenerator[Dict[str, Any], None]: """Process a non-streaming LLM response, handling tool calls and execution. @@ -1057,12 +1060,15 @@ class ResponseProcessor: return parsed_data # Tool execution methods - async def _execute_tool(self, tool_call: Dict[str, Any]) -> ToolResult: + async def _execute_tool(self, tool_call: Dict[str, Any], trace: Optional[StatefulTraceClient] = None) -> ToolResult: """Execute a single tool call and return the result.""" + span = None + if trace: + span = trace.span(name=f"execute_tool.{tool_call['function_name']}", input=tool_call["arguments"]) try: function_name = tool_call["function_name"] arguments = tool_call["arguments"] - + logger.info(f"Executing tool: {function_name} with arguments: {arguments}") if isinstance(arguments, str): @@ -1078,14 +1084,20 @@ class ResponseProcessor: tool_fn = available_functions.get(function_name) if not tool_fn: logger.error(f"Tool function '{function_name}' not found in registry") + if span: + span.end(status_message="tool_not_found", level="ERROR") return ToolResult(success=False, output=f"Tool function '{function_name}' not found") logger.debug(f"Found tool function for '{function_name}', executing...") result = await tool_fn(**arguments) logger.info(f"Tool execution complete: {function_name} -> {result}") + if span: + span.end(status_message="tool_executed", output=result) return result except Exception as e: logger.error(f"Error executing tool {tool_call['function_name']}: {str(e)}", exc_info=True) + if span: + span.end(status_message="tool_execution_error", output=f"Error executing tool: {str(e)}", level="ERROR") return ToolResult(success=False, output=f"Error executing tool: {str(e)}") async def _execute_tools( diff --git a/backend/agentpress/thread_manager.py b/backend/agentpress/thread_manager.py index be8b48a6..7aac40d6 100644 --- a/backend/agentpress/thread_manager.py +++ b/backend/agentpress/thread_manager.py @@ -22,6 +22,8 @@ from agentpress.response_processor import ( ) from services.supabase import DBConnection from utils.logger import logger +from langfuse.client import StatefulGenerationClient, StatefulTraceClient +import datetime # Type alias for tool choice ToolChoice = Literal["auto", "required", "none"] @@ -161,7 +163,9 @@ class ThreadManager: include_xml_examples: bool = False, enable_thinking: Optional[bool] = False, reasoning_effort: Optional[str] = 'low', - enable_context_manager: bool = True + enable_context_manager: bool = True, + generation: Optional[StatefulGenerationClient] = None, + trace: Optional[StatefulTraceClient] = None ) -> Union[Dict[str, Any], AsyncGenerator]: """Run a conversation thread with LLM integration and tool execution. @@ -322,6 +326,20 @@ Here are the XML tools available with examples: # 5. Make LLM API call logger.debug("Making LLM API call") try: + if generation: + generation.update( + input=prepared_messages, + start_time=datetime.datetime.now(datetime.timezone.utc), + model=llm_model, + model_parameters={ + "max_tokens": llm_max_tokens, + "temperature": llm_temperature, + "enable_thinking": enable_thinking, + "reasoning_effort": reasoning_effort, + "tool_choice": tool_choice, + "tools": openapi_tool_schemas, + } + ) llm_response = await make_llm_api_call( prepared_messages, # Pass the potentially modified messages llm_model, @@ -347,7 +365,8 @@ Here are the XML tools available with examples: thread_id=thread_id, config=processor_config, prompt_messages=prepared_messages, - llm_model=llm_model + llm_model=llm_model, + trace=trace ) return response_generator @@ -359,7 +378,8 @@ Here are the XML tools available with examples: thread_id=thread_id, config=processor_config, prompt_messages=prepared_messages, - llm_model=llm_model + llm_model=llm_model, + trace=trace ) return response_generator # Return the generator diff --git a/backend/poetry.lock b/backend/poetry.lock index 18bcaaf0..69613283 100644 --- a/backend/poetry.lock +++ b/backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -249,6 +249,18 @@ files = [ [package.extras] visualize = ["Twisted (>=16.1.1)", "graphviz (>0.5.1)"] +[[package]] +name = "backoff" +version = "2.2.1" +description = "Function decoration for backoff and retry" +optional = false +python-versions = ">=3.7,<4.0" +groups = ["main"] +files = [ + {file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"}, + {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, +] + [[package]] name = "boto3" version = "1.37.34" @@ -1217,6 +1229,33 @@ files = [ [package.dependencies] referencing = ">=0.31.0" +[[package]] +name = "langfuse" +version = "2.60.5" +description = "A client library for accessing langfuse" +optional = false +python-versions = "<4.0,>=3.9" +groups = ["main"] +files = [ + {file = "langfuse-2.60.5-py3-none-any.whl", hash = "sha256:fd27d52017f36d6fa5ca652615213a2535dc93dd88c3375eeb811af26384d285"}, + {file = "langfuse-2.60.5.tar.gz", hash = "sha256:a33ecddc98cf6d12289372e63071b77b72230e7bc8260ee349f1465d53bf425b"}, +] + +[package.dependencies] +anyio = ">=4.4.0,<5.0.0" +backoff = ">=1.10.0" +httpx = ">=0.15.4,<1.0" +idna = ">=3.7,<4.0" +packaging = ">=23.2,<25.0" +pydantic = ">=1.10.7,<3.0" +requests = ">=2,<3" +wrapt = ">=1.14,<2.0" + +[package.extras] +langchain = ["langchain (>=0.0.309)"] +llama-index = ["llama-index (>=0.10.12,<2.0.0)"] +openai = ["openai (>=0.27.8)"] + [[package]] name = "litellm" version = "1.66.1" @@ -3539,4 +3578,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "6163a36d6c3507a20552400544de78f7b48a92a98c8c68db7c98263465bf275a" +content-hash = "8bf5f2b60329678979d6eceb2c9860e92b9f2f68cad75651239fdacfc3964633" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 5f4c4143..af8e5872 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -52,6 +52,7 @@ stripe = "^12.0.1" dramatiq = "^1.17.1" pika = "^1.3.2" prometheus-client = "^0.21.1" +langfuse = "^2.60.5" [tool.poetry.scripts] agentpress = "agentpress.cli:main" diff --git a/backend/requirements.txt b/backend/requirements.txt index 06901056..b1b0e9ff 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -32,3 +32,4 @@ stripe>=12.0.1 dramatiq>=1.17.1 pika>=1.3.2 prometheus-client>=0.21.1 +langfuse>=2.60.5 diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index 927008c3..4cb52302 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -13,6 +13,7 @@ from services.supabase import DBConnection from services import redis from dramatiq.brokers.rabbitmq import RabbitmqBroker import os +from services.langfuse import langfuse rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq') rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672)) @@ -98,6 +99,7 @@ async def run_agent_background( logger.error(f"Error in stop signal checker for {agent_run_id}: {e}", exc_info=True) stop_signal_received = True # Stop the run if the checker fails + trace = langfuse.trace(name="agent_run", id=agent_run_id, session_id=thread_id, metadata={"project_id": project_id, "instance_id": instance_id}) try: # Setup Pub/Sub listener for control signals pubsub = await redis.create_pubsub() @@ -108,12 +110,14 @@ async def run_agent_background( # Ensure active run key exists and has TTL await redis.set(instance_active_key, "running", ex=redis.REDIS_KEY_TTL) + # Initialize agent generator agent_gen = run_agent( thread_id=thread_id, project_id=project_id, stream=stream, thread_manager=thread_manager, model_name=model_name, enable_thinking=enable_thinking, reasoning_effort=reasoning_effort, - enable_context_manager=enable_context_manager + enable_context_manager=enable_context_manager, + trace=trace ) final_status = "running" @@ -123,6 +127,7 @@ async def run_agent_background( if stop_signal_received: logger.info(f"Agent run {agent_run_id} stopped by signal.") final_status = "stopped" + trace.span(name="agent_run_stopped").end(status_message="agent_run_stopped", level="WARNING") break # Store response in Redis list and publish notification @@ -147,6 +152,7 @@ async def run_agent_background( duration = (datetime.now(timezone.utc) - start_time).total_seconds() logger.info(f"Agent run {agent_run_id} completed normally (duration: {duration:.2f}s, responses: {total_responses})") completion_message = {"type": "status", "status": "completed", "message": "Agent run completed successfully"} + trace.span(name="agent_run_completed").end(status_message="agent_run_completed") await redis.rpush(response_list_key, json.dumps(completion_message)) await redis.publish(response_channel, "new") # Notify about the completion message @@ -172,6 +178,7 @@ async def run_agent_background( duration = (datetime.now(timezone.utc) - start_time).total_seconds() logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})") final_status = "failed" + trace.span(name="agent_run_failed").end(status_message=error_message, level="ERROR") # Push error message to Redis list error_response = {"type": "status", "status": "error", "message": error_message} diff --git a/backend/services/langfuse.py b/backend/services/langfuse.py new file mode 100644 index 00000000..cf624bf3 --- /dev/null +++ b/backend/services/langfuse.py @@ -0,0 +1,12 @@ +import os +from langfuse import Langfuse + +public_key = os.getenv("LANGFUSE_PUBLIC_KEY") +secret_key = os.getenv("LANGFUSE_SECRET_KEY") +host = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com") + +enabled = False +if public_key and secret_key: + enabled = True + +langfuse = Langfuse(enabled=enabled) diff --git a/backend/utils/config.py b/backend/utils/config.py index 085cf041..119a852f 100644 --- a/backend/utils/config.py +++ b/backend/utils/config.py @@ -162,6 +162,11 @@ class Configuration: SANDBOX_IMAGE_NAME = "kortix/suna:0.1.2.8" SANDBOX_ENTRYPOINT = "/usr/bin/supervisord -n -c /etc/supervisor/conf.d/supervisord.conf" + # LangFuse configuration + LANGFUSE_PUBLIC_KEY: Optional[str] = None + LANGFUSE_SECRET_KEY: Optional[str] = None + LANGFUSE_HOST: str = "https://cloud.langfuse.com" + @property def STRIPE_PRODUCT_ID(self) -> str: if self.ENV_MODE == EnvMode.STAGING: diff --git a/docker-compose.yaml b/docker-compose.yaml index 039a0cdd..1ce7d844 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,6 +1,8 @@ services: redis: image: redis:7-alpine + ports: + - "6379:6379" volumes: - redis_data:/data - ./backend/services/docker/redis.conf:/usr/local/etc/redis/redis.conf:ro @@ -14,7 +16,7 @@ services: rabbitmq: image: rabbitmq ports: - - "5672:5672" + - "5672:5672" - "15672:15672" volumes: - rabbitmq_data:/var/lib/rabbitmq