feat(backend): langfuse traces

This commit is contained in:
sharath 2025-05-21 00:39:28 +00:00
parent 3408906ad7
commit ff1670be90
No known key found for this signature in database
11 changed files with 141 additions and 13 deletions

View File

@ -43,3 +43,7 @@ FIRECRAWL_URL=
DAYTONA_API_KEY=
DAYTONA_SERVER_URL=
DAYTONA_TARGET=
LANGFUSE_PUBLIC_KEY="pk-REDACTED"
LANGFUSE_SECRET_KEY="sk-REDACTED"
LANGFUSE_HOST="https://cloud.langfuse.com"

View File

@ -23,6 +23,8 @@ from utils.logger import logger
from utils.auth_utils import get_account_id_from_thread
from services.billing import check_billing_status
from agent.tools.sb_vision_tool import SandboxVisionTool
from services.langfuse import langfuse
from langfuse.client import StatefulTraceClient
load_dotenv()
@ -36,7 +38,8 @@ async def run_agent(
model_name: str = "anthropic/claude-3-7-sonnet-latest",
enable_thinking: Optional[bool] = False,
reasoning_effort: Optional[str] = 'low',
enable_context_manager: bool = True
enable_context_manager: bool = True,
trace: Optional[StatefulTraceClient] = None
):
"""Run the development agent with specified configuration."""
logger.info(f"🚀 Starting agent with model: {model_name}")
@ -55,6 +58,10 @@ async def run_agent(
if not project.data or len(project.data) == 0:
raise ValueError(f"Project {project_id} not found")
if not trace:
logger.warning("No trace provided, creating a new one")
trace = langfuse.trace(name="agent_run", id=thread_id, session_id=thread_id, metadata={"project_id": project_id})
project_data = project.data[0]
sandbox_info = project_data.get('sandbox', {})
if not sandbox_info.get('id'):
@ -92,9 +99,11 @@ async def run_agent(
iteration_count += 1
logger.info(f"🔄 Running iteration {iteration_count} of {max_iterations}...")
span = trace.span(name="billing_check")
# Billing check on each iteration - still needed within the iterations
can_run, message, subscription = await check_billing_status(client, account_id)
if not can_run:
span.end(status_message="billing_limit_reached")
error_msg = f"Billing limit reached: {message}"
# Yield a special message to indicate billing limit reached
yield {
@ -103,6 +112,9 @@ async def run_agent(
"message": error_msg
}
break
span.end(status_message="billing_limit_not_reached")
span = trace.span(name="get_latest_message")
# Check if last message is from assistant using direct Supabase query
latest_message = await client.table('messages').select('*').eq('thread_id', thread_id).in_('type', ['assistant', 'tool', 'user']).order('created_at', desc=True).limit(1).execute()
if latest_message.data and len(latest_message.data) > 0:
@ -110,12 +122,15 @@ async def run_agent(
if message_type == 'assistant':
logger.info(f"Last message was from assistant, stopping execution")
continue_execution = False
span.end(status_message="last_message_from_assistant")
break
span.end(status_message="last_message_not_from_assistant")
# ---- Temporary Message Handling (Browser State & Image Context) ----
temporary_message = None
temp_message_content_list = [] # List to hold text/image blocks
span = trace.span(name="get_latest_browser_state_message")
# Get the latest browser_state message
latest_browser_state_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'browser_state').order('created_at', desc=True).limit(1).execute()
if latest_browser_state_msg.data and len(latest_browser_state_msg.data) > 0:
@ -156,7 +171,9 @@ async def run_agent(
except Exception as e:
logger.error(f"Error parsing browser state: {e}")
span.end(status_message="get_latest_browser_state_message")
span = trace.span(name="get_latest_image_context_message")
# Get the latest image_context message (NEW)
latest_image_context_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'image_context').order('created_at', desc=True).limit(1).execute()
if latest_image_context_msg.data and len(latest_image_context_msg.data) > 0:
@ -183,6 +200,7 @@ async def run_agent(
await client.table('messages').delete().eq('message_id', latest_image_context_msg.data[0]["message_id"]).execute()
except Exception as e:
logger.error(f"Error parsing image context: {e}")
span.end(status_message="get_latest_image_context_message")
# If we have any content, construct the temporary_message
if temp_message_content_list:
@ -197,6 +215,7 @@ async def run_agent(
elif "gpt-4" in model_name.lower():
max_tokens = 4096
generation = trace.generation(name="thread_manager.run_thread")
try:
# Make the LLM call and process the response
response = await thread_manager.run_thread(
@ -221,7 +240,9 @@ async def run_agent(
include_xml_examples=True,
enable_thinking=enable_thinking,
reasoning_effort=reasoning_effort,
enable_context_manager=enable_context_manager
enable_context_manager=enable_context_manager,
generation=generation,
trace=trace
)
if isinstance(response, dict) and "status" in response and response["status"] == "error":
@ -235,6 +256,7 @@ async def run_agent(
# Process the response
error_detected = False
try:
full_response = ""
async for chunk in response:
# If we receive an error chunk, we should stop after this iteration
if isinstance(chunk, dict) and chunk.get('type') == 'status' and chunk.get('status') == 'error':
@ -255,6 +277,7 @@ async def run_agent(
# The actual text content is nested within
assistant_text = assistant_content_json.get('content', '')
full_response += assistant_text
if isinstance(assistant_text, str): # Ensure it's a string
# Check for the closing tags as they signal the end of the tool usage
if '</ask>' in assistant_text or '</complete>' in assistant_text or '</web-browser-takeover>' in assistant_text:
@ -278,11 +301,14 @@ async def run_agent(
# Check if we should stop based on the last tool call or error
if error_detected:
logger.info(f"Stopping due to error detected in response")
generation.end(output=full_response, status_message="error_detected")
break
if last_tool_call in ['ask', 'complete', 'web-browser-takeover']:
logger.info(f"Agent decided to stop with tool: {last_tool_call}")
generation.end(output=full_response, status_message="agent_stopped")
continue_execution = False
except Exception as e:
# Just log the error and re-raise to stop all iterations
error_msg = f"Error during response streaming: {str(e)}"
@ -306,6 +332,10 @@ async def run_agent(
}
# Stop execution immediately on any error
break
generation.end(output=full_response)
langfuse.flush() # Flush Langfuse events at the end of the run
# # TESTING

View File

@ -21,6 +21,7 @@ from litellm import completion_cost
from agentpress.tool import Tool, ToolResult
from agentpress.tool_registry import ToolRegistry
from utils.logger import logger
from langfuse.client import StatefulTraceClient
# Type alias for XML result adding strategy
XmlAddingStrategy = Literal["user_message", "assistant_message", "inline_edit"]
@ -99,6 +100,7 @@ class ResponseProcessor:
prompt_messages: List[Dict[str, Any]],
llm_model: str,
config: ProcessorConfig = ProcessorConfig(),
trace: Optional[StatefulTraceClient] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
"""Process a streaming LLM response, handling tool calls and execution.
@ -209,7 +211,7 @@ class ResponseProcessor:
if started_msg_obj: yield started_msg_obj
yielded_tool_indices.add(tool_index) # Mark status as yielded
execution_task = asyncio.create_task(self._execute_tool(tool_call))
execution_task = asyncio.create_task(self._execute_tool(tool_call, trace))
pending_tool_executions.append({
"task": execution_task, "tool_call": tool_call,
"tool_index": tool_index, "context": context
@ -587,7 +589,8 @@ class ResponseProcessor:
thread_id: str,
prompt_messages: List[Dict[str, Any]],
llm_model: str,
config: ProcessorConfig = ProcessorConfig()
config: ProcessorConfig = ProcessorConfig(),
trace: Optional[StatefulTraceClient] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
"""Process a non-streaming LLM response, handling tool calls and execution.
@ -1057,12 +1060,13 @@ class ResponseProcessor:
return parsed_data
# Tool execution methods
async def _execute_tool(self, tool_call: Dict[str, Any]) -> ToolResult:
async def _execute_tool(self, tool_call: Dict[str, Any], trace: Optional[StatefulTraceClient] = None) -> ToolResult:
"""Execute a single tool call and return the result."""
span = trace.span(name=f"execute_tool.{tool_call['function_name']}", input=tool_call["arguments"])
try:
function_name = tool_call["function_name"]
arguments = tool_call["arguments"]
logger.info(f"Executing tool: {function_name} with arguments: {arguments}")
if isinstance(arguments, str):
@ -1078,14 +1082,17 @@ class ResponseProcessor:
tool_fn = available_functions.get(function_name)
if not tool_fn:
logger.error(f"Tool function '{function_name}' not found in registry")
span.end(status_message="tool_not_found")
return ToolResult(success=False, output=f"Tool function '{function_name}' not found")
logger.debug(f"Found tool function for '{function_name}', executing...")
result = await tool_fn(**arguments)
logger.info(f"Tool execution complete: {function_name} -> {result}")
span.end(status_message="tool_executed", output=result)
return result
except Exception as e:
logger.error(f"Error executing tool {tool_call['function_name']}: {str(e)}", exc_info=True)
span.end(status_message="tool_execution_error", output=f"Error executing tool: {str(e)}")
return ToolResult(success=False, output=f"Error executing tool: {str(e)}")
async def _execute_tools(

View File

@ -22,6 +22,8 @@ from agentpress.response_processor import (
)
from services.supabase import DBConnection
from utils.logger import logger
from langfuse.client import StatefulGenerationClient, StatefulTraceClient
import datetime
# Type alias for tool choice
ToolChoice = Literal["auto", "required", "none"]
@ -161,7 +163,9 @@ class ThreadManager:
include_xml_examples: bool = False,
enable_thinking: Optional[bool] = False,
reasoning_effort: Optional[str] = 'low',
enable_context_manager: bool = True
enable_context_manager: bool = True,
generation: Optional[StatefulGenerationClient] = None,
trace: Optional[StatefulTraceClient] = None
) -> Union[Dict[str, Any], AsyncGenerator]:
"""Run a conversation thread with LLM integration and tool execution.
@ -322,6 +326,20 @@ Here are the XML tools available with examples:
# 5. Make LLM API call
logger.debug("Making LLM API call")
try:
if generation:
generation.update(
input=prepared_messages,
start_time=datetime.datetime.now(datetime.timezone.utc),
model=llm_model,
model_parameters={
"max_tokens": llm_max_tokens,
"temperature": llm_temperature,
"enable_thinking": enable_thinking,
"reasoning_effort": reasoning_effort,
"tool_choice": tool_choice,
"tools": openapi_tool_schemas,
}
)
llm_response = await make_llm_api_call(
prepared_messages, # Pass the potentially modified messages
llm_model,
@ -347,7 +365,8 @@ Here are the XML tools available with examples:
thread_id=thread_id,
config=processor_config,
prompt_messages=prepared_messages,
llm_model=llm_model
llm_model=llm_model,
trace=trace
)
return response_generator
@ -359,7 +378,8 @@ Here are the XML tools available with examples:
thread_id=thread_id,
config=processor_config,
prompt_messages=prepared_messages,
llm_model=llm_model
llm_model=llm_model,
trace=trace
)
return response_generator # Return the generator

43
backend/poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@ -249,6 +249,18 @@ files = [
[package.extras]
visualize = ["Twisted (>=16.1.1)", "graphviz (>0.5.1)"]
[[package]]
name = "backoff"
version = "2.2.1"
description = "Function decoration for backoff and retry"
optional = false
python-versions = ">=3.7,<4.0"
groups = ["main"]
files = [
{file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"},
{file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"},
]
[[package]]
name = "boto3"
version = "1.37.34"
@ -1217,6 +1229,33 @@ files = [
[package.dependencies]
referencing = ">=0.31.0"
[[package]]
name = "langfuse"
version = "2.60.5"
description = "A client library for accessing langfuse"
optional = false
python-versions = "<4.0,>=3.9"
groups = ["main"]
files = [
{file = "langfuse-2.60.5-py3-none-any.whl", hash = "sha256:fd27d52017f36d6fa5ca652615213a2535dc93dd88c3375eeb811af26384d285"},
{file = "langfuse-2.60.5.tar.gz", hash = "sha256:a33ecddc98cf6d12289372e63071b77b72230e7bc8260ee349f1465d53bf425b"},
]
[package.dependencies]
anyio = ">=4.4.0,<5.0.0"
backoff = ">=1.10.0"
httpx = ">=0.15.4,<1.0"
idna = ">=3.7,<4.0"
packaging = ">=23.2,<25.0"
pydantic = ">=1.10.7,<3.0"
requests = ">=2,<3"
wrapt = ">=1.14,<2.0"
[package.extras]
langchain = ["langchain (>=0.0.309)"]
llama-index = ["llama-index (>=0.10.12,<2.0.0)"]
openai = ["openai (>=0.27.8)"]
[[package]]
name = "litellm"
version = "1.66.1"
@ -3539,4 +3578,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "6163a36d6c3507a20552400544de78f7b48a92a98c8c68db7c98263465bf275a"
content-hash = "8bf5f2b60329678979d6eceb2c9860e92b9f2f68cad75651239fdacfc3964633"

View File

@ -52,6 +52,7 @@ stripe = "^12.0.1"
dramatiq = "^1.17.1"
pika = "^1.3.2"
prometheus-client = "^0.21.1"
langfuse = "^2.60.5"
[tool.poetry.scripts]
agentpress = "agentpress.cli:main"

View File

@ -33,3 +33,4 @@ tavily-python>=0.5.4
pytesseract==0.3.13
stripe>=7.0.0
dramatiq[rabbitmq]>=1.17.1
langfuse>=2.60.5

View File

@ -13,6 +13,7 @@ from services.supabase import DBConnection
from services import redis
from dramatiq.brokers.rabbitmq import RabbitmqBroker
import os
from services.langfuse import langfuse
rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq')
rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672))
@ -98,6 +99,7 @@ async def run_agent_background(
logger.error(f"Error in stop signal checker for {agent_run_id}: {e}", exc_info=True)
stop_signal_received = True # Stop the run if the checker fails
trace = langfuse.trace(name="agent_run", id=agent_run_id, session_id=thread_id, metadata={"project_id": project_id, "instance_id": instance_id})
try:
# Setup Pub/Sub listener for control signals
pubsub = await redis.create_pubsub()
@ -108,12 +110,14 @@ async def run_agent_background(
# Ensure active run key exists and has TTL
await redis.set(instance_active_key, "running", ex=redis.REDIS_KEY_TTL)
# Initialize agent generator
agent_gen = run_agent(
thread_id=thread_id, project_id=project_id, stream=stream,
thread_manager=thread_manager, model_name=model_name,
enable_thinking=enable_thinking, reasoning_effort=reasoning_effort,
enable_context_manager=enable_context_manager
enable_context_manager=enable_context_manager,
trace=trace
)
final_status = "running"
@ -123,6 +127,7 @@ async def run_agent_background(
if stop_signal_received:
logger.info(f"Agent run {agent_run_id} stopped by signal.")
final_status = "stopped"
trace.span(name="agent_run_stopped").end(status_message="agent_run_stopped")
break
# Store response in Redis list and publish notification
@ -147,6 +152,7 @@ async def run_agent_background(
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
logger.info(f"Agent run {agent_run_id} completed normally (duration: {duration:.2f}s, responses: {total_responses})")
completion_message = {"type": "status", "status": "completed", "message": "Agent run completed successfully"}
trace.span(name="agent_run_completed").end(status_message="agent_run_completed")
await redis.rpush(response_list_key, json.dumps(completion_message))
await redis.publish(response_channel, "new") # Notify about the completion message
@ -172,6 +178,7 @@ async def run_agent_background(
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})")
final_status = "failed"
trace.span(name="agent_run_failed").end(status_message=error_message)
# Push error message to Redis list
error_response = {"type": "status", "status": "error", "message": error_message}

View File

@ -0,0 +1,12 @@
import os
from langfuse import Langfuse
public_key = os.getenv("LANGFUSE_PUBLIC_KEY")
secret_key = os.getenv("LANGFUSE_SECRET_KEY")
host = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
enabled = False
if public_key and secret_key:
enabled = True
langfuse = Langfuse(enabled=enabled)

View File

@ -162,6 +162,11 @@ class Configuration:
SANDBOX_IMAGE_NAME = "kortix/suna:0.1.2.8"
SANDBOX_ENTRYPOINT = "/usr/bin/supervisord -n -c /etc/supervisor/conf.d/supervisord.conf"
# LangFuse configuration
LANGFUSE_PUBLIC_KEY: Optional[str] = None
LANGFUSE_SECRET_KEY: Optional[str] = None
LANGFUSE_HOST: str = "https://cloud.langfuse.com"
@property
def STRIPE_PRODUCT_ID(self) -> str:
if self.ENV_MODE == EnvMode.STAGING:

View File

@ -1,6 +1,8 @@
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
- ./backend/services/docker/redis.conf:/usr/local/etc/redis/redis.conf:ro
@ -14,7 +16,7 @@ services:
rabbitmq:
image: rabbitmq
ports:
- "5672:5672"
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq