Refactor tool management and remove unused files

- Renamed `KortixMCPTools` to `MCPTools` for consistency across the SDK.
- Updated import statements and `__all__` definitions to reflect the new class name.
- Removed unused files including `.kvstore.json`, `example_stream.txt`, `play.py`, `stream.py`, and `stream_test.py` to streamline the project structure and eliminate redundancy.
This commit is contained in:
mykonos-ibiza 2025-08-01 01:33:20 +05:30
parent 4da30a7b9a
commit 4ca74e9907
9 changed files with 7 additions and 875 deletions

View File

@ -7,6 +7,6 @@ A Python SDK for creating and managing AI agents with thread execution capabilit
__version__ = "0.1.0"
from .kortix.kortix import Kortix
from .kortix.tools import AgentPressTools, KortixMCPTools
from .kortix.tools import AgentPressTools, MCPTools
__all__ = ["Kortix", "AgentPressTools", "KortixMCPTools"]
__all__ = ["Kortix", "AgentPressTools", "MCPTools"]

View File

@ -1,4 +0,0 @@
{
"agent_id": "05911d66-5cc2-458c-b8d6-29920ef50af8",
"thread_id": "e026d13a-a697-44fe-8d46-a0da5a980c04"
}

View File

@ -1,6 +1,6 @@
from .api.threads import AgentStartRequest
from .thread import Thread, AgentRun
from .tools import AgentPressTools, KortixMCPTools, KortixTools
from .tools import AgentPressTools, MCPTools, KortixTools
from .api.agents import (
AgentCreateRequest,
AgentPress_ToolConfig,
@ -46,7 +46,7 @@ class KortixAgent:
agentpress_tools[tool] = AgentPress_ToolConfig(
enabled=True, description=tool.get_description()
)
elif isinstance(tool, KortixMCPTools):
elif isinstance(tool, MCPTools):
mcp = tool
custom_mcps.append(
CustomMCP(

View File

@ -1,7 +1,7 @@
from .api import agents, threads
from .agent import KortixAgent
from .thread import KortixThread
from .tools import AgentPressTools, KortixMCPTools
from .tools import AgentPressTools, MCPTools
class Kortix:

File diff suppressed because one or more lines are too long

View File

@ -1,134 +0,0 @@
import asyncio
import json
import os
from typing import Any, Optional
from dotenv import load_dotenv
from fastmcp import FastMCP
from kortix.kortix import Kortix
from kortix.tools import AgentPressTools, KortixMCP
from .stream import RealtimeStreamProcessor, RealtimeCallbacks
load_dotenv("../../.env")
# Local key-value store for storing agent and thread IDs
class LocalKVStore:
def __init__(self, filename: str = ".kvstore.json"):
self.filename = filename
self._data = {}
self._load()
def _load(self):
if os.path.exists(self.filename):
try:
with open(self.filename, "r", encoding="utf-8") as f:
self._data = json.load(f)
except Exception:
self._data = {}
else:
self._data = {}
def _save(self):
with open(self.filename, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2)
def get(self, key: str, default: Optional[Any] = None) -> Any:
return self._data.get(key, default)
def set(self, key: str, value: Any):
self._data[key] = value
self._save()
def delete(self, key: str):
if key in self._data:
del self._data[key]
self._save()
def clear(self):
self._data = {}
self._save()
kv = LocalKVStore()
mcp = FastMCP(name="Kortix")
@mcp.tool
async def get_weather(city: str) -> str:
return f"The weather in {city} is windy."
async def main():
"""
Please ignore the asyncio.exceptions.CancelledError that is thrown when the MCP server is stopped. I couldn't fix it.
"""
kortixMCP = KortixMCP(mcp, "http://localhost:4000/mcp/")
await kortixMCP.initialize()
# Start the MCP server in the background
asyncio.create_task(
mcp.run_http_async(
show_banner=False, log_level="error", host="0.0.0.0", port=4000
)
)
kortix = Kortix(
os.getenv("KORTIX_API_KEY", "pk_xxx:sk_xxx"),
"http://localhost:8000/api",
)
# Setup the agent
agent_id = kv.get("agent_id")
if not agent_id:
agent = await kortix.Agent.create(
name="Generic Agent",
system_prompt="You are a generic agent. You can use the tools provided to you to answer questions.",
model="anthropic/claude-sonnet-4-20250514",
tools=[AgentPressTools.WEB_SEARCH_TOOL, kortixMCP],
)
kv.set("agent_id", agent._agent_id)
else:
agent = await kortix.Agent.get(agent_id)
# Setup the thread
thread_id = kv.get("thread_id")
if not thread_id:
thread = await kortix.Thread.create()
kv.set("thread_id", thread._thread_id)
else:
thread = await kortix.Thread.get(thread_id)
# Run the agent
agent_run = await agent.run("What is the weather in Bangalore?", thread)
stream = await agent_run.get_stream()
processor = RealtimeStreamProcessor(
callbacks=RealtimeCallbacks(
# on_text_update=lambda full_text: print(f"[TEXT] {full_text}"), # Uncomment to see each chunk coming in
on_status_update=lambda status: print(
f"[STATUS] {status.get('status_type', status.get('status', 'unknown'))}"
),
on_function_call_start=lambda: print("\n[TOOL USE DETECTED]"),
on_function_call_update=lambda details: print(
f'[TOOL UPDATE] Calling function: "{details.name}"'
),
on_tool_result=lambda message: print(f"[TOOL RESULT] {message.content}"),
on_message_end=lambda message: print(f"[MESSAGE] {message.content}"),
)
)
async for line in stream:
processor.process_line(line)
if __name__ == "__main__":
try:
asyncio.run(main())
except:
exit(0)

View File

@ -1,369 +0,0 @@
"""
@fileoverview
This file is mostly important for dealing with the content of the streamed data,
perhaps the frontend if the stream doesn't need to be decoded in the backend.
"""
from dataclasses import dataclass, field
from typing import Optional, Dict, List, Callable, Any, Literal, AsyncGenerator
import json
import re
import httpx
def try_parse_json(json_str: str) -> Optional[Any]:
"""Utility function to safely parse JSON strings."""
try:
return json.loads(json_str)
except (json.JSONDecodeError, TypeError):
return None
@dataclass
class BaseStreamEvent:
"""The base structure for any event coming from the stream."""
thread_id: str
type: Literal["status", "assistant", "tool"]
is_llm_message: bool
metadata: str # Often a JSON string
created_at: str
updated_at: str
message_id: Optional[str] = None
content: Optional[str] = None
status: Optional[str] = None
message: Optional[str] = None
@dataclass
class AssistantMessageChunk(BaseStreamEvent):
"""
Represents a chunk of a streaming assistant message.
`message_id` is null, and `sequence` is used for ordering.
"""
sequence: Optional[int] = None
def __post_init__(self):
# Ensure message_id is None for chunks and type is assistant
self.message_id = None
self.type = "assistant"
if not self.content:
self.content = ""
@dataclass
class CompleteMessage(BaseStreamEvent):
"""
Represents a final, complete message (assistant, tool, or status with an ID).
`message_id` is a non-null string.
"""
def __post_init__(self):
# Ensure message_id is set and content has a default
if not self.message_id:
self.message_id = ""
if not self.content:
self.content = ""
@dataclass
class AssistantContentChunk:
"""The structure of the content within an AssistantMessageChunk."""
role: Literal["assistant"]
content: str
@dataclass
class ProcessedStreamResult:
"""The structured result after processing the stream."""
final_messages: List[CompleteMessage]
thread_id: Optional[str] = None
run_ids: List[str] = field(default_factory=list)
@dataclass
class FunctionCallDetails:
"""Details about a function call in the stream."""
name: Optional[str] = None
# We could add `parameters: Optional[str]` here later
@dataclass
class ToolResultContent:
"""The content of a tool result message. The inner `content` is a JSON string."""
role: Literal["user"] # Or 'tool', depending on the API spec
content: str # A JSON string containing ToolExecutionResult
@dataclass
class ToolExecutionResult:
"""The parsed result of a tool execution."""
tool_execution: Dict[str, Any]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ToolExecutionResult":
return cls(tool_execution=data.get("tool_execution", {}))
@dataclass
class ToolResultMessage(CompleteMessage):
"""Represents a complete tool result message."""
def __post_init__(self):
super().__post_init__()
# Ensure type is tool
self.type = "tool"
# Type guard functions
def is_assistant_message_chunk(event: Any) -> bool:
"""Type guard to check if an event is a streaming chunk."""
return (
hasattr(event, "type")
and event.type == "assistant"
and hasattr(event, "message_id")
and event.message_id is None
and hasattr(event, "sequence")
and isinstance(event.sequence, int)
)
def is_complete_message(event: Any) -> bool:
"""Type guard to check for any complete, final message."""
return hasattr(event, "message_id") and event.message_id is not None
def is_tool_result_message(event: Any) -> bool:
"""Type guard to specifically identify a tool result message."""
return (
hasattr(event, "type")
and event.type == "tool"
and hasattr(event, "message_id")
and event.message_id is not None
)
# Callback type definitions
OnStreamStartCallback = Callable[[], None]
OnTextUpdateCallback = Callable[[str], None]
OnMessageEndCallback = Callable[[CompleteMessage], None]
OnStatusUpdateCallback = Callable[[Any], None]
OnFunctionCallStartCallback = Callable[[], None]
OnFunctionCallUpdateCallback = Callable[[FunctionCallDetails], None]
OnFunctionCallEndCallback = Callable[[], None]
OnToolResultCallback = Callable[[ToolResultMessage], None]
@dataclass
class RealtimeCallbacks:
"""Callbacks for handling real-time stream events."""
on_stream_start: Optional[OnStreamStartCallback] = None
on_text_update: Optional[OnTextUpdateCallback] = None
on_message_end: Optional[OnMessageEndCallback] = None
on_status_update: Optional[OnStatusUpdateCallback] = None
on_function_call_start: Optional[OnFunctionCallStartCallback] = None
on_function_call_update: Optional[OnFunctionCallUpdateCallback] = None
on_function_call_end: Optional[OnFunctionCallEndCallback] = None
on_tool_result: Optional[OnToolResultCallback] = None
ParsingState = Literal["text", "in_function_call", "function_call_ended"]
@dataclass
class StreamState:
"""Internal state for stream processing."""
chunks: List[AssistantMessageChunk] = field(default_factory=list)
full_text: str = ""
parsing_state: ParsingState = "text"
current_function_call: Optional[FunctionCallDetails] = None
class RealtimeStreamProcessor:
"""
Example streaming parser for reference.
Processes real-time stream data with callback support.
"""
def __init__(self, callbacks: Optional[RealtimeCallbacks] = None):
self.messages: Dict[str, CompleteMessage] = {}
self.state = StreamState()
self.callbacks = callbacks or RealtimeCallbacks()
self.stream_active = False
self.invoke_name_regex = re.compile(r'<invoke\s+name="([^"]+)"')
def _create_default_state(self) -> StreamState:
"""Create a new default state."""
return StreamState()
def _start_stream_if_inactive(self) -> None:
"""Start the stream if it's not already active."""
if not self.stream_active:
self.stream_active = True
if self.callbacks.on_stream_start:
self.callbacks.on_stream_start()
def _handle_chunk(self, chunk: AssistantMessageChunk) -> None:
"""Handle an incoming assistant message chunk."""
self._start_stream_if_inactive()
if not chunk.content:
return
chunk_content_data = try_parse_json(chunk.content)
if not chunk_content_data or "content" not in chunk_content_data:
return
chunk_content = chunk_content_data["content"]
# Use the instance state
self.state.chunks.append(chunk)
# Sort chunks by sequence to handle out-of-order delivery
self.state.chunks.sort(key=lambda c: c.sequence or 0)
# Rebuild full text from sorted chunks
full_text_parts = []
for c in self.state.chunks:
if c.content is not None:
content_data = try_parse_json(c.content)
if content_data and "content" in content_data:
full_text_parts.append(content_data["content"])
self.state.full_text = "".join(full_text_parts)
self._update_parsing_state(self.state)
if self.callbacks.on_text_update:
self.callbacks.on_text_update(self.state.full_text)
def _update_parsing_state(self, state: StreamState) -> None:
"""Update the parsing state based on current content."""
if state.parsing_state == "text":
if "<function_calls>" in state.full_text:
state.parsing_state = "in_function_call"
state.current_function_call = FunctionCallDetails(name=None)
if self.callbacks.on_function_call_start:
self.callbacks.on_function_call_start()
self._update_parsing_state(state)
elif state.parsing_state == "in_function_call":
if state.current_function_call and state.current_function_call.name is None:
match = self.invoke_name_regex.search(state.full_text)
if match:
state.current_function_call.name = match.group(1)
if self.callbacks.on_function_call_update:
self.callbacks.on_function_call_update(
FunctionCallDetails(name=state.current_function_call.name)
)
if "</function_calls>" in state.full_text:
state.parsing_state = "function_call_ended"
if self.callbacks.on_function_call_end:
self.callbacks.on_function_call_end()
state.current_function_call = None
elif state.parsing_state == "function_call_ended":
pass
def process_line(self, line: str) -> None:
"""Process a single line from the stream."""
if not line.startswith("data:"):
return
try:
event_data = try_parse_json(line[5:]) # Remove "data:" prefix
if not event_data:
return
# Convert dict to appropriate object based on type
if (
event_data.get("type") == "assistant"
and event_data.get("message_id") is None
and "sequence" in event_data
):
chunk = AssistantMessageChunk(
message_id=None,
thread_id=event_data.get("thread_id", ""),
type="assistant",
is_llm_message=event_data.get("is_llm_message", False),
metadata=event_data.get("metadata", ""),
created_at=event_data.get("created_at", ""),
updated_at=event_data.get("updated_at", ""),
content=event_data.get("content", ""),
sequence=event_data.get("sequence"),
)
self._handle_chunk(chunk)
elif (
event_data.get("type") == "tool"
and event_data.get("message_id") is not None
):
tool_message = ToolResultMessage(
message_id=event_data["message_id"],
thread_id=event_data.get("thread_id", ""),
type="tool",
is_llm_message=event_data.get("is_llm_message", False),
metadata=event_data.get("metadata", ""),
created_at=event_data.get("created_at", ""),
updated_at=event_data.get("updated_at", ""),
content=event_data.get("content", ""),
)
self._handle_tool_result_message(tool_message)
elif (
event_data.get("type") == "assistant"
and event_data.get("message_id") is not None
):
complete_message = CompleteMessage(
message_id=event_data["message_id"],
thread_id=event_data.get("thread_id", ""),
type="assistant",
is_llm_message=event_data.get("is_llm_message", False),
metadata=event_data.get("metadata", ""),
created_at=event_data.get("created_at", ""),
updated_at=event_data.get("updated_at", ""),
content=event_data.get("content", ""),
)
self._handle_complete_assistant_message(complete_message)
elif event_data.get("type") == "status" and self.callbacks.on_status_update:
status_details = try_parse_json(event_data.get("content", "{}")) or {}
if event_data.get("status"):
status_details["status_type"] = event_data["status"]
if event_data.get("message"):
status_details["message"] = event_data["message"]
# Merge event data with status details
full_status = {**event_data, **status_details}
self.callbacks.on_status_update(full_status)
except Exception as error:
print(f"Failed to process stream line: {line}, error: {error}")
def _handle_tool_result_message(self, message: ToolResultMessage) -> None:
"""Handle a tool result message."""
if message.message_id:
self.messages[message.message_id] = message
if self.callbacks.on_tool_result:
self.callbacks.on_tool_result(message)
def _handle_complete_assistant_message(self, message: CompleteMessage) -> None:
"""Handle a complete assistant message."""
if message.message_id:
self.messages[message.message_id] = message
if self.callbacks.on_message_end:
self.callbacks.on_message_end(message)
self.state = self._create_default_state()

View File

@ -1,136 +0,0 @@
import asyncio
import json
from .stream import (
RealtimeStreamProcessor,
RealtimeCallbacks,
CompleteMessage,
ToolResultMessage,
)
from .stream import try_parse_json
# Load the example stream data from the file
def load_example_stream_text():
try:
with open("./kortix/example_stream.txt", "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
print("Warning: example_stream.txt not found, using fallback data")
return """data: {"type": "status", "status": "starting", "message": "Agent is starting"}
data: {"type": "assistant", "message_id": null, "thread_id": "thread_123", "is_llm_message": true, "metadata": "", "created_at": "2024-01-01T10:00:00Z", "updated_at": "2024-01-01T10:00:00Z", "content": "{\\"role\\": \\"assistant\\", \\"content\\": \\"Hello! I can help you with\\"}", "sequence": 1}
data: {"type": "status", "status": "completed", "finish_reason": "completed"}"""
EXAMPLE_STREAM_TEXT = load_example_stream_text()
async def test_realtime_stream_processor():
"""Test the RealtimeStreamProcessor with sample stream data."""
print("Starting test...")
def on_stream_start():
print("[STREAM START]")
def on_text_update(full_text: str):
# Uncomment to see text updates
# print(f"\r[TEXT] {full_text}")
pass
def on_status_update(status: dict):
finish_reason = status.get("finish_reason", "received")
status_type = status.get("status_type", status.get("status", "unknown"))
print(f"[STATUS] {status_type} {finish_reason}")
def on_function_call_start():
print("\n[TOOL USE DETECTED]")
def on_function_call_update(details):
if details.name:
print(f'[TOOL UPDATE] Calling function: "{details.name}"')
def on_tool_result(message: ToolResultMessage):
if not message.content:
print(f"[TOOL RESULT] No content in message")
return
content = try_parse_json(message.content)
if not content:
print(f"[TOOL RESULT] Failed to parse message content")
return
content_str = content.get("content", "")
if not content_str:
print(f"[TOOL RESULT] No content string in parsed content")
return
execution_result = try_parse_json(content_str)
if not execution_result:
print(f"[TOOL RESULT] Failed to parse execution result")
return
tool_execution = execution_result.get("tool_execution", {})
tool_name = tool_execution.get("function_name")
result = tool_execution.get("result", {})
was_success = result.get("success", False)
output = json.dumps(result.get("output", {}))
error = json.dumps(result.get("error", {}))
msg = f'[TOOL RESULT] Message ID: {message.message_id} | Tool: "{tool_name}" | '
if was_success:
output_preview = output[:80] + "..." if len(output) > 80 else output
if output_preview == "{}":
output_preview = "No answer found."
msg += f"Success! Output: {output_preview}"
else:
msg += f"Failure! Error: {error}"
print(msg)
def on_function_call_end():
print("[TOOL USE WAITING]")
def on_message_end(full_message: CompleteMessage):
print() # New line
if not full_message.content:
print(f"[MESSAGE] No content in message")
return
content = try_parse_json(full_message.content)
if content:
role = content.get("role", "unknown")
message_content = content.get("content", "")
preview = (
message_content[:100] + "..."
if len(message_content) > 100
else message_content
)
print(f"[MESSAGE] {role}: {preview}")
else:
print(f"[MESSAGE] Failed to parse message content")
# Create the processor with callbacks
callbacks = RealtimeCallbacks(
on_stream_start=on_stream_start,
on_text_update=on_text_update,
on_status_update=on_status_update,
on_function_call_start=on_function_call_start,
on_function_call_update=on_function_call_update,
on_tool_result=on_tool_result,
on_function_call_end=on_function_call_end,
on_message_end=on_message_end,
)
processor = RealtimeStreamProcessor(callbacks=callbacks)
# Process the stream line by line
lines = EXAMPLE_STREAM_TEXT.strip().split("\n")
for line in lines:
await asyncio.sleep(0.001) # Small delay to simulate streaming
processor.process_line(line)
print("\nTest completed successfully!")
# Run the test
if __name__ == "__main__":
asyncio.run(test_realtime_stream_processor())

View File

@ -3,7 +3,7 @@ from enum import Enum
from fastmcp import Client as FastMCPClient
class KortixMCPTools:
class MCPTools:
def __init__(
self, endpoint: str, name: str, allowed_tools: list[str] | None = None
):
@ -60,4 +60,4 @@ class AgentPressTools(str, Enum):
return desc
KortixTools = Union[AgentPressTools, KortixMCPTools]
KortixTools = Union[AgentPressTools, MCPTools]