From aa75f47ac31605c60173ee34d6135303c3f7285e Mon Sep 17 00:00:00 2001 From: mykonos-ibiza <222371740+mykonos-ibiza@users.noreply.github.com> Date: Tue, 29 Jul 2025 21:23:37 +0530 Subject: [PATCH] feat(kortix): enhance streaming capabilities and add utility functions - Introduced a new `stream.py` module for processing streamed data, including classes for handling different message types. - Added a local key-value store for managing agent and thread IDs. - Implemented a real-time stream processor with callbacks for handling various events. - Created example scripts for testing the streaming functionality and integrated weather tool. - Updated import statements for better organization and clarity. --- sdk/kortix/api/utils.py | 22 ++++++++++++++++++++++ sdk/kortix/kortix.py | 1 + sdk/kortix/{ => play}/example_stream.txt | 0 sdk/kortix/{ => play}/play.py | 6 +++--- sdk/kortix/{ => play}/stream.py | 20 -------------------- sdk/kortix/{ => play}/stream_test.py | 0 sdk/kortix/thread.py | 2 +- 7 files changed, 27 insertions(+), 24 deletions(-) create mode 100644 sdk/kortix/api/utils.py rename sdk/kortix/{ => play}/example_stream.txt (100%) rename sdk/kortix/{ => play}/play.py (97%) rename sdk/kortix/{ => play}/stream.py (95%) rename sdk/kortix/{ => play}/stream_test.py (100%) diff --git a/sdk/kortix/api/utils.py b/sdk/kortix/api/utils.py new file mode 100644 index 00000000..f837c667 --- /dev/null +++ b/sdk/kortix/api/utils.py @@ -0,0 +1,22 @@ +from typing import AsyncGenerator +import httpx + + +async def stream_from_url(url: str, **kwargs) -> AsyncGenerator[str, None]: + """ + Helper function that takes a URL and returns an async generator yielding lines. + + Args: + url: The URL to stream from + **kwargs: Additional arguments to pass to httpx.AsyncClient.stream() + + Yields: + str: Each line from the streaming response + """ + async with httpx.AsyncClient() as client: + async with client.stream("GET", url, **kwargs) as response: + response.raise_for_status() + + async for line in response.aiter_lines(): + if line.strip(): # Only yield non-empty lines + yield line.strip() diff --git a/sdk/kortix/kortix.py b/sdk/kortix/kortix.py index 8e9c1921..49d243b6 100644 --- a/sdk/kortix/kortix.py +++ b/sdk/kortix/kortix.py @@ -1,6 +1,7 @@ from .api import agents, threads from .agent import KortixAgent from .thread import KortixThread +from .tools import AgentPressTools, KortixMCP class Kortix: diff --git a/sdk/kortix/example_stream.txt b/sdk/kortix/play/example_stream.txt similarity index 100% rename from sdk/kortix/example_stream.txt rename to sdk/kortix/play/example_stream.txt diff --git a/sdk/kortix/play.py b/sdk/kortix/play/play.py similarity index 97% rename from sdk/kortix/play.py rename to sdk/kortix/play/play.py index 2059eeb5..e29268e9 100644 --- a/sdk/kortix/play.py +++ b/sdk/kortix/play/play.py @@ -6,11 +6,11 @@ from dotenv import load_dotenv from fastmcp import FastMCP -from .kortix import Kortix -from .tools import AgentPressTools, KortixMCP +from kortix.kortix import Kortix +from kortix.tools import AgentPressTools, KortixMCP from .stream import RealtimeStreamProcessor, RealtimeCallbacks -load_dotenv("../.env") +load_dotenv("../../.env") # Local key-value store for storing agent and thread IDs diff --git a/sdk/kortix/stream.py b/sdk/kortix/play/stream.py similarity index 95% rename from sdk/kortix/stream.py rename to sdk/kortix/play/stream.py index 8c57c1ca..3bde0d1c 100644 --- a/sdk/kortix/stream.py +++ b/sdk/kortix/play/stream.py @@ -20,26 +20,6 @@ def try_parse_json(json_str: str) -> Optional[Any]: return None -async def stream_from_url(url: str, **kwargs) -> AsyncGenerator[str, None]: - """ - Helper function that takes a URL and returns an async generator yielding lines. - - Args: - url: The URL to stream from - **kwargs: Additional arguments to pass to httpx.AsyncClient.stream() - - Yields: - str: Each line from the streaming response - """ - async with httpx.AsyncClient() as client: - async with client.stream("GET", url, **kwargs) as response: - response.raise_for_status() - - async for line in response.aiter_lines(): - if line.strip(): # Only yield non-empty lines - yield line.strip() - - @dataclass class BaseStreamEvent: """The base structure for any event coming from the stream.""" diff --git a/sdk/kortix/stream_test.py b/sdk/kortix/play/stream_test.py similarity index 100% rename from sdk/kortix/stream_test.py rename to sdk/kortix/play/stream_test.py diff --git a/sdk/kortix/thread.py b/sdk/kortix/thread.py index 396ebb5c..fe2a40a1 100644 --- a/sdk/kortix/thread.py +++ b/sdk/kortix/thread.py @@ -1,7 +1,7 @@ from typing import AsyncGenerator from .api.threads import ThreadsClient -from .stream import stream_from_url +from .api.utils import stream_from_url class Thread: