Enhance JSON parsing and output formatting in async stream printer

- Updated the `print_stream` function to utilize a new utility function `try_parse_json` for safer JSON parsing, improving error handling for invalid input.
- Enhanced output formatting for various event types, including status updates, assistant messages, and tool execution results, ensuring clearer and more informative logs.
- Added checks for empty content and improved handling of parsing failures to maintain robustness in stream processing.
This commit is contained in:
mykonos-ibiza 2025-07-31 23:09:12 +05:30
parent 076064a976
commit 891e232ce2
1 changed files with 83 additions and 67 deletions

View File

@ -1,89 +1,105 @@
import json
from typing import AsyncGenerator
from typing import AsyncGenerator, Optional, Any
def try_parse_json(json_str: str) -> Optional[Any]:
"""Utility function to safely parse JSON strings."""
try:
return json.loads(json_str)
except (json.JSONDecodeError, TypeError):
return None
async def print_stream(stream: AsyncGenerator[str, None]):
"""
Simple stream printer that processes async string generator.
Prints different types of stream events with basic formatting.
Follows the same output format as stream_test.py.
"""
stream_started = False
async for line in stream:
line = line.strip()
# Skip empty lines
if not line:
continue
# Parse stream data lines
if line.startswith("data: "):
json_str = line[6:] # Remove "data: " prefix
try:
data = json.loads(json_str)
event_type = data.get("type", "unknown")
if event_type == "status":
status = data.get("status", "unknown")
message = data.get("message", "")
finish_reason = data.get("finish_reason", "")
if status == "starting":
print("🔄 Stream starting...")
elif status == "completed":
print(
f"✅ Stream completed ({finish_reason})"
if finish_reason
else "✅ Stream completed"
data = try_parse_json(json_str)
if not data:
continue
event_type = data.get("type", "unknown")
# Print stream start on first event
if not stream_started:
print("[STREAM START]")
stream_started = True
if event_type == "status":
finish_reason = data.get("finish_reason", "received")
status_type = data.get("status_type", data.get("status", "unknown"))
print(f"[STATUS] {status_type} {finish_reason}")
elif event_type == "assistant":
# Handle assistant messages - print at end of message
content = data.get("content", "")
if content:
parsed_content = try_parse_json(content)
if parsed_content:
role = parsed_content.get("role", "unknown")
message_content = parsed_content.get("content", "")
preview = (
message_content[:100] + "..."
if len(message_content) > 100
else message_content
)
elif status == "error":
print(f"❌ Stream error: {message}")
print() # New line
print(f"[MESSAGE] {role}: {preview}")
else:
print(f"📋 Status: {status}")
print() # New line
print(f"[MESSAGE] Failed to parse message content")
elif event_type == "tool":
# Handle tool results
message_id = data.get("message_id")
content = data.get("content", "")
if not content:
print(f"[TOOL RESULT] No content in message")
continue
elif event_type == "assistant":
content = data.get("content", "")
if content:
try:
# Parse the nested JSON content
content_data = json.loads(content)
assistant_content = content_data.get("content", "")
if assistant_content:
print(f"🤖 Assistant: {assistant_content}")
except json.JSONDecodeError:
print(f"🤖 Assistant: {content}")
parsed_content = try_parse_json(content)
if not parsed_content:
print(f"[TOOL RESULT] Failed to parse message content")
continue
elif event_type == "tool":
content = data.get("content", "")
if content:
try:
content_data = json.loads(content)
tool_content = content_data.get("content", "")
if tool_content:
tool_data = json.loads(tool_content)
tool_execution = tool_data.get("tool_execution", {})
function_name = tool_execution.get(
"function_name", "unknown"
)
result = tool_execution.get("result", {})
success = result.get("success", False)
content_str = parsed_content.get("content", "")
if not content_str:
print(f"[TOOL RESULT] No content string in parsed content")
continue
if success:
output = result.get("output", {})
print(
f"🔧 Tool {function_name}: Success - {output}"
)
else:
error = result.get("error", {})
print(f"🔧 Tool {function_name}: Error - {error}")
except json.JSONDecodeError:
print(f"🔧 Tool: {content}")
execution_result = try_parse_json(content_str)
if not execution_result:
print(f"[TOOL RESULT] Failed to parse execution result")
continue
tool_execution = execution_result.get("tool_execution", {})
tool_name = tool_execution.get("function_name")
result = tool_execution.get("result", {})
was_success = result.get("success", False)
output = json.dumps(result.get("output", {}))
error = json.dumps(result.get("error", {}))
msg = f'[TOOL RESULT] Message ID: {message_id} | Tool: "{tool_name}" | '
if was_success:
output_preview = output[:80] + "..." if len(output) > 80 else output
if output_preview == "{}":
output_preview = "No answer found."
msg += f"Success! Output: {output_preview}"
else:
print(f"📄 {event_type}: {data}")
except json.JSONDecodeError:
print(f"❓ Invalid JSON: {line}")
else:
# Non-data lines (headers, etc.)
if line and not line.startswith(":"): # Skip SSE comments
print(f"📝 {line}")
msg += f"Failure! Error: {error}"
print(msg)