chore(dependencies): update resumable-stream dependency to version 0.1.1 and refactor stream context management in agent module

This commit is contained in:
sharath 2025-07-08 15:03:37 +00:00
parent 3bb7219bef
commit 9fa595c772
No known key found for this signature in database
5 changed files with 35 additions and 62 deletions

View File

@ -8,7 +8,6 @@ import uuid
from typing import Optional, List, Dict, Any
from pydantic import BaseModel
import os
from resumable_stream.runtime import create_resumable_stream_context, ResumableStreamContext
from services.supabase import DBConnection
from services import redis
@ -18,7 +17,7 @@ from services.billing import check_billing_status, can_use_model
from utils.config import config
from sandbox.sandbox import create_sandbox, delete_sandbox, get_or_start_sandbox
from services.llm import make_llm_api_call
from agent.run_agent import run_agent_run_stream, update_agent_run_status, StreamBroadcaster
from agent.run_agent import run_agent_run_stream, update_agent_run_status, get_stream_context
from utils.constants import MODEL_NAME_ALIASES
from flags.flags import is_enabled
@ -30,16 +29,6 @@ instance_id = None # Global instance ID for this backend instance
# TTL for Redis response lists (24 hours)
REDIS_RESPONSE_LIST_TTL = 3600 * 24
stream_context_global: Optional[ResumableStreamContext] = None
async def get_stream_context():
global stream_context_global
if stream_context_global:
return stream_context_global
r = await redis.initialize_async()
stream_context_global = create_resumable_stream_context(r, "resumable_stream")
return stream_context_global
class AgentStartRequest(BaseModel):
model_name: Optional[str] = None # Will be set from config.MODEL_TO_USE in the endpoint
enable_thinking: Optional[bool] = False
@ -675,7 +664,6 @@ async def stream_agent_run(
logger.error(f"Failed to start sandbox for project {project_id}: {str(e)}")
return
# Create the stream
stream = await stream_context.resumable_stream(agent_run_id, lambda: run_agent_run_stream(
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
project_id=project_id,
@ -687,15 +675,6 @@ async def stream_agent_run(
target_agent_id=target_agent_id,
request_id=request_id
))
broadcaster = StreamBroadcaster(stream)
stream_q = broadcaster.add_consumer()
print_q = broadcaster.add_consumer()
asyncio.create_task(broadcaster.start())
asyncio.create_task(StreamBroadcaster.iterate_bg(print_q))
stream = StreamBroadcaster.queue_to_stream(stream_q)
logger.info(f"Created new stream for agent run {agent_run_id}")
else:
@ -703,7 +682,7 @@ async def stream_agent_run(
if stream is None:
logger.error(f"Failed to create or resume stream for agent run {agent_run_id}")
return
raise HTTPException(status_code=500, detail=f"Failed to create or resume stream for agent run {agent_run_id}")
return StreamingResponse(stream, media_type="text/event-stream", headers={
"Cache-Control": "no-cache, no-transform", "Connection": "keep-alive",

View File

@ -16,47 +16,22 @@ from services.langfuse import langfuse
from utils.retry import retry
from typing import AsyncGenerator
import json
from resumable_stream.runtime import create_resumable_stream_context, ResumableStreamContext
_initialized = False
db = DBConnection()
instance_id = "single"
# Create stream broadcaster for multiple consumers
class StreamBroadcaster:
def __init__(self, source: AsyncIterable[Any]):
self.source = source
self.queues: List[asyncio.Queue] = []
stream_context_global: Optional[ResumableStreamContext] = None
def add_consumer(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue()
self.queues.append(q)
return q
async def start(self) -> None:
async for chunk in self.source:
for q in self.queues:
await q.put(chunk)
for q in self.queues:
await q.put(None) # Sentinel to close consumers
# Consumer wrapper as an async generator
@staticmethod
async def queue_to_stream(queue: asyncio.Queue) -> AsyncIterable[Any]:
while True:
chunk = await queue.get()
if chunk is None:
break
yield chunk
# Print consumer task
@staticmethod
async def iterate_bg(queue: asyncio.Queue) -> None:
while True:
chunk = await queue.get()
if chunk is None:
break
pass
async def get_stream_context():
global stream_context_global
if stream_context_global:
return stream_context_global
r = await redis.initialize_async()
stream_context_global = create_resumable_stream_context(r, "resumable_stream")
return stream_context_global
async def initialize():

View File

@ -76,4 +76,4 @@ repository = "https://github.com/kortix-ai/suna"
package = false
[tool.uv.sources]
resumable-stream = { git = "https://github.com/kortix-ai/resumable-stream-python", tag = "v0.0.2" }
resumable-stream = { git = "https://github.com/kortix-ai/resumable-stream-python", tag = "v0.1.1" }

View File

@ -10,6 +10,7 @@ from datetime import datetime, timezone
from .core import TriggerResult, TriggerEvent
from services.supabase import DBConnection
from utils.logger import logger
from agent.run_agent import get_stream_context, run_agent_run_stream
class AgentTriggerExecutor:
"""Handles execution of agents when triggered by external events."""
@ -242,6 +243,8 @@ Please respond appropriately to this trigger event."""
) -> str:
"""Start agent execution using the existing agent system."""
client = await self.db.client
model_name = "anthropic/claude-sonnet-4-20250514"
# Create agent run record
agent_run_data = {
@ -251,7 +254,7 @@ Please respond appropriately to this trigger event."""
"status": "running",
"started_at": datetime.now(timezone.utc).isoformat(),
"metadata": {
"model_name": "anthropic/claude-sonnet-4-20250514",
"model_name": model_name,
"enable_thinking": False,
"reasoning_effort": "low",
"enable_context_manager": True,
@ -268,8 +271,24 @@ Please respond appropriately to this trigger event."""
instance_key = f"active_run:{instance_id}:{agent_run_id}"
try:
from services import redis
stream_context = await get_stream_context()
await redis.set(instance_key, "running", ex=redis.REDIS_KEY_TTL)
logger.info(f"Registered trigger agent run in Redis ({instance_key})")
_ = await stream_context.resumable_stream(agent_run_id, lambda: run_agent_run_stream(
agent_run_id=agent_run_id, thread_id=thread_id, instance_id="trigger_executor",
project_id=project_id,
model_name=model_name,
enable_thinking=False,
reasoning_effort="low",
stream=False,
enable_context_manager=True,
agent_config=agent_config,
is_agent_builder=False,
target_agent_id=None,
request_id=None
))
logger.info(f"Started agent trigger execution ({instance_key})")
except Exception as e:
logger.warning(f"Failed to register trigger agent run in Redis ({instance_key}): {str(e)}")

View File

@ -2188,7 +2188,7 @@ wheels = [
[[package]]
name = "resumable-stream"
version = "0.0.1"
source = { git = "https://github.com/kortix-ai/resumable-stream-python?tag=v0.0.2#822c2518697c67c2f21480807474c1df1e7e2322" }
source = { git = "https://github.com/kortix-ai/resumable-stream-python?tag=v0.1.1#6fc86de52ebc6a3c764798464ebd1287c53e6d62" }
[[package]]
name = "rpds-py"
@ -2498,7 +2498,7 @@ requires-dist = [
{ name = "questionary", specifier = "==2.0.1" },
{ name = "redis", specifier = "==5.2.1" },
{ name = "requests", specifier = "==2.32.3" },
{ name = "resumable-stream", git = "https://github.com/kortix-ai/resumable-stream-python?tag=v0.0.2" },
{ name = "resumable-stream", git = "https://github.com/kortix-ai/resumable-stream-python?tag=v0.1.1" },
{ name = "sentry-sdk", extras = ["fastapi"], specifier = "==2.29.1" },
{ name = "setuptools", specifier = "==75.3.0" },
{ name = "stripe", specifier = "==12.0.1" },