diff --git a/backend/agent/api.py b/backend/agent/api.py index 5b828804..ea7fd098 100644 --- a/backend/agent/api.py +++ b/backend/agent/api.py @@ -8,7 +8,6 @@ import uuid from typing import Optional, List, Dict, Any from pydantic import BaseModel import os -from resumable_stream.runtime import create_resumable_stream_context, ResumableStreamContext from services.supabase import DBConnection from services import redis @@ -18,7 +17,7 @@ from services.billing import check_billing_status, can_use_model from utils.config import config from sandbox.sandbox import create_sandbox, delete_sandbox, get_or_start_sandbox from services.llm import make_llm_api_call -from agent.run_agent import run_agent_run_stream, update_agent_run_status, StreamBroadcaster +from agent.run_agent import run_agent_run_stream, update_agent_run_status, get_stream_context from utils.constants import MODEL_NAME_ALIASES from flags.flags import is_enabled @@ -30,16 +29,6 @@ instance_id = None # Global instance ID for this backend instance # TTL for Redis response lists (24 hours) REDIS_RESPONSE_LIST_TTL = 3600 * 24 -stream_context_global: Optional[ResumableStreamContext] = None - -async def get_stream_context(): - global stream_context_global - if stream_context_global: - return stream_context_global - r = await redis.initialize_async() - stream_context_global = create_resumable_stream_context(r, "resumable_stream") - return stream_context_global - class AgentStartRequest(BaseModel): model_name: Optional[str] = None # Will be set from config.MODEL_TO_USE in the endpoint enable_thinking: Optional[bool] = False @@ -675,7 +664,6 @@ async def stream_agent_run( logger.error(f"Failed to start sandbox for project {project_id}: {str(e)}") return - # Create the stream stream = await stream_context.resumable_stream(agent_run_id, lambda: run_agent_run_stream( agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id, project_id=project_id, @@ -687,15 +675,6 @@ async def stream_agent_run( target_agent_id=target_agent_id, request_id=request_id )) - - broadcaster = StreamBroadcaster(stream) - stream_q = broadcaster.add_consumer() - print_q = broadcaster.add_consumer() - - asyncio.create_task(broadcaster.start()) - asyncio.create_task(StreamBroadcaster.iterate_bg(print_q)) - - stream = StreamBroadcaster.queue_to_stream(stream_q) logger.info(f"Created new stream for agent run {agent_run_id}") else: @@ -703,7 +682,7 @@ async def stream_agent_run( if stream is None: logger.error(f"Failed to create or resume stream for agent run {agent_run_id}") - return + raise HTTPException(status_code=500, detail=f"Failed to create or resume stream for agent run {agent_run_id}") return StreamingResponse(stream, media_type="text/event-stream", headers={ "Cache-Control": "no-cache, no-transform", "Connection": "keep-alive", diff --git a/backend/agent/run_agent.py b/backend/agent/run_agent.py index 695936ca..1bc7460f 100644 --- a/backend/agent/run_agent.py +++ b/backend/agent/run_agent.py @@ -16,47 +16,22 @@ from services.langfuse import langfuse from utils.retry import retry from typing import AsyncGenerator import json +from resumable_stream.runtime import create_resumable_stream_context, ResumableStreamContext _initialized = False db = DBConnection() instance_id = "single" -# Create stream broadcaster for multiple consumers -class StreamBroadcaster: - def __init__(self, source: AsyncIterable[Any]): - self.source = source - self.queues: List[asyncio.Queue] = [] +stream_context_global: Optional[ResumableStreamContext] = None - def add_consumer(self) -> asyncio.Queue: - q: asyncio.Queue = asyncio.Queue() - self.queues.append(q) - return q - - async def start(self) -> None: - async for chunk in self.source: - for q in self.queues: - await q.put(chunk) - for q in self.queues: - await q.put(None) # Sentinel to close consumers - - # Consumer wrapper as an async generator - @staticmethod - async def queue_to_stream(queue: asyncio.Queue) -> AsyncIterable[Any]: - while True: - chunk = await queue.get() - if chunk is None: - break - yield chunk - - # Print consumer task - @staticmethod - async def iterate_bg(queue: asyncio.Queue) -> None: - while True: - chunk = await queue.get() - if chunk is None: - break - pass +async def get_stream_context(): + global stream_context_global + if stream_context_global: + return stream_context_global + r = await redis.initialize_async() + stream_context_global = create_resumable_stream_context(r, "resumable_stream") + return stream_context_global async def initialize(): diff --git a/backend/pyproject.toml b/backend/pyproject.toml index dc5d006a..a7751144 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -76,4 +76,4 @@ repository = "https://github.com/kortix-ai/suna" package = false [tool.uv.sources] -resumable-stream = { git = "https://github.com/kortix-ai/resumable-stream-python", tag = "v0.0.2" } +resumable-stream = { git = "https://github.com/kortix-ai/resumable-stream-python", tag = "v0.1.1" } diff --git a/backend/triggers/integration.py b/backend/triggers/integration.py index d9dc843d..27ba4377 100644 --- a/backend/triggers/integration.py +++ b/backend/triggers/integration.py @@ -10,6 +10,7 @@ from datetime import datetime, timezone from .core import TriggerResult, TriggerEvent from services.supabase import DBConnection from utils.logger import logger +from agent.run_agent import get_stream_context, run_agent_run_stream class AgentTriggerExecutor: """Handles execution of agents when triggered by external events.""" @@ -242,6 +243,8 @@ Please respond appropriately to this trigger event.""" ) -> str: """Start agent execution using the existing agent system.""" client = await self.db.client + + model_name = "anthropic/claude-sonnet-4-20250514" # Create agent run record agent_run_data = { @@ -251,7 +254,7 @@ Please respond appropriately to this trigger event.""" "status": "running", "started_at": datetime.now(timezone.utc).isoformat(), "metadata": { - "model_name": "anthropic/claude-sonnet-4-20250514", + "model_name": model_name, "enable_thinking": False, "reasoning_effort": "low", "enable_context_manager": True, @@ -268,8 +271,24 @@ Please respond appropriately to this trigger event.""" instance_key = f"active_run:{instance_id}:{agent_run_id}" try: from services import redis + stream_context = await get_stream_context() await redis.set(instance_key, "running", ex=redis.REDIS_KEY_TTL) - logger.info(f"Registered trigger agent run in Redis ({instance_key})") + + _ = await stream_context.resumable_stream(agent_run_id, lambda: run_agent_run_stream( + agent_run_id=agent_run_id, thread_id=thread_id, instance_id="trigger_executor", + project_id=project_id, + model_name=model_name, + enable_thinking=False, + reasoning_effort="low", + stream=False, + enable_context_manager=True, + agent_config=agent_config, + is_agent_builder=False, + target_agent_id=None, + request_id=None + )) + + logger.info(f"Started agent trigger execution ({instance_key})") except Exception as e: logger.warning(f"Failed to register trigger agent run in Redis ({instance_key}): {str(e)}") diff --git a/backend/uv.lock b/backend/uv.lock index b4d7ee8d..fb4d703d 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -2188,7 +2188,7 @@ wheels = [ [[package]] name = "resumable-stream" version = "0.0.1" -source = { git = "https://github.com/kortix-ai/resumable-stream-python?tag=v0.0.2#822c2518697c67c2f21480807474c1df1e7e2322" } +source = { git = "https://github.com/kortix-ai/resumable-stream-python?tag=v0.1.1#6fc86de52ebc6a3c764798464ebd1287c53e6d62" } [[package]] name = "rpds-py" @@ -2498,7 +2498,7 @@ requires-dist = [ { name = "questionary", specifier = "==2.0.1" }, { name = "redis", specifier = "==5.2.1" }, { name = "requests", specifier = "==2.32.3" }, - { name = "resumable-stream", git = "https://github.com/kortix-ai/resumable-stream-python?tag=v0.0.2" }, + { name = "resumable-stream", git = "https://github.com/kortix-ai/resumable-stream-python?tag=v0.1.1" }, { name = "sentry-sdk", extras = ["fastapi"], specifier = "==2.29.1" }, { name = "setuptools", specifier = "==75.3.0" }, { name = "stripe", specifier = "==12.0.1" },