2025-03-29 06:33:55 +08:00
"""
Conversation thread management system for AgentPress .
This module provides comprehensive conversation management , including :
- Thread creation and persistence
- Message handling with support for text and images
- Tool registration and execution
- LLM interaction with streaming support
- Error handling and cleanup
2025-04-17 07:16:53 +08:00
- Context summarization to manage token limits
2025-03-29 06:33:55 +08:00
"""
import json
2025-04-17 07:16:53 +08:00
from typing import List , Dict , Any , Optional , Type , Union , AsyncGenerator , Literal
2025-03-30 14:48:57 +08:00
from services . llm import make_llm_api_call
2025-04-17 07:16:53 +08:00
from agentpress . tool import Tool
2025-03-30 14:48:57 +08:00
from agentpress . tool_registry import ToolRegistry
2025-04-17 07:16:53 +08:00
from agentpress . context_manager import ContextManager
2025-04-04 23:06:49 +08:00
from agentpress . response_processor import (
2025-04-26 19:53:09 +08:00
ResponseProcessor ,
ProcessorConfig
2025-04-04 23:06:49 +08:00
)
2025-03-30 14:48:57 +08:00
from services . supabase import DBConnection
2025-04-02 02:49:35 +08:00
from utils . logger import logger
2025-05-21 08:39:28 +08:00
from langfuse . client import StatefulGenerationClient , StatefulTraceClient
2025-05-24 20:50:36 +08:00
from services . langfuse import langfuse
2025-05-21 08:39:28 +08:00
import datetime
2025-06-06 16:56:14 +08:00
from litellm import token_counter
2025-03-30 14:48:57 +08:00
2025-04-04 23:06:49 +08:00
# Type alias for tool choice
ToolChoice = Literal [ " auto " , " required " , " none " ]
2025-03-29 06:33:55 +08:00
class ThreadManager :
""" Manages conversation threads with LLM models and tool execution.
2025-04-26 19:53:09 +08:00
2025-03-29 06:33:55 +08:00
Provides comprehensive conversation management , handling message threading ,
tool registration , and LLM interactions with support for both standard and
XML - based tool execution patterns .
"""
2025-05-31 23:31:20 +08:00
def __init__ ( self , trace : Optional [ StatefulTraceClient ] = None , is_agent_builder : bool = False , target_agent_id : Optional [ str ] = None ) :
2025-04-17 07:16:53 +08:00
""" Initialize ThreadManager.
2025-04-26 19:53:09 +08:00
2025-05-31 23:31:20 +08:00
Args :
trace : Optional trace client for logging
is_agent_builder : Whether this is an agent builder session
target_agent_id : ID of the agent being built ( if in agent builder mode )
2025-04-17 07:16:53 +08:00
"""
2025-03-29 06:33:55 +08:00
self . db = DBConnection ( )
self . tool_registry = ToolRegistry ( )
2025-05-24 20:50:36 +08:00
self . trace = trace
2025-05-31 23:31:20 +08:00
self . is_agent_builder = is_agent_builder
self . target_agent_id = target_agent_id
2025-05-24 20:50:36 +08:00
if not self . trace :
self . trace = langfuse . trace ( name = " anonymous:thread_manager " )
2025-04-04 23:06:49 +08:00
self . response_processor = ResponseProcessor (
tool_registry = self . tool_registry ,
2025-05-24 20:50:36 +08:00
add_message_callback = self . add_message ,
2025-05-31 23:31:20 +08:00
trace = self . trace ,
is_agent_builder = self . is_agent_builder ,
target_agent_id = self . target_agent_id
2025-04-04 23:06:49 +08:00
)
2025-04-17 07:16:53 +08:00
self . context_manager = ContextManager ( )
2025-03-29 06:33:55 +08:00
2025-06-03 14:32:50 +08:00
def _is_tool_result_message ( self , msg : Dict [ str , Any ] ) - > bool :
if not ( " content " in msg and msg [ ' content ' ] ) :
return False
content = msg [ ' content ' ]
if isinstance ( content , str ) and " ToolResult " in content : return True
if isinstance ( content , dict ) and " tool_execution " in content : return True
2025-06-19 22:19:48 +08:00
if isinstance ( content , dict ) and " interactive_elements " in content : return True
2025-06-03 14:32:50 +08:00
if isinstance ( content , str ) :
try :
parsed_content = json . loads ( content )
if isinstance ( parsed_content , dict ) and " tool_execution " in parsed_content : return True
2025-06-19 22:19:48 +08:00
if isinstance ( parsed_content , dict ) and " interactive_elements " in content : return True
2025-06-03 14:32:50 +08:00
except ( json . JSONDecodeError , TypeError ) :
pass
return False
2025-06-06 16:56:14 +08:00
def _compress_message ( self , msg_content : Union [ str , dict ] , message_id : Optional [ str ] = None , max_length : int = 3000 ) - > Union [ str , dict ] :
""" Compress the message content. """
# print("max_length", max_length)
if isinstance ( msg_content , str ) :
if len ( msg_content ) > max_length :
2025-06-16 05:14:09 +08:00
return msg_content [ : max_length ] + " ... (truncated) " + f " \n \n message_id \" { message_id } \" \n Use expand-message tool to see contents "
2025-06-06 16:56:14 +08:00
else :
return msg_content
elif isinstance ( msg_content , dict ) :
if len ( json . dumps ( msg_content ) ) > max_length :
2025-06-16 05:14:09 +08:00
return json . dumps ( msg_content ) [ : max_length ] + " ... (truncated) " + f " \n \n message_id \" { message_id } \" \n Use expand-message tool to see contents "
2025-06-06 16:56:14 +08:00
else :
return msg_content
2025-06-18 13:53:51 +08:00
def _safe_truncate ( self , msg_content : Union [ str , dict ] , max_length : int = 100000 ) - > Union [ str , dict ] :
""" Truncate the message content safely by removing the middle portion. """
2025-06-18 15:25:29 +08:00
max_length = min ( max_length , 100000 )
2025-06-06 16:56:14 +08:00
if isinstance ( msg_content , str ) :
if len ( msg_content ) > max_length :
2025-06-18 13:53:51 +08:00
# Calculate how much to keep from start and end
2025-06-18 15:25:29 +08:00
keep_length = max_length - 150 # Reserve space for truncation message
2025-06-18 13:53:51 +08:00
start_length = keep_length / / 2
end_length = keep_length - start_length
start_part = msg_content [ : start_length ]
end_part = msg_content [ - end_length : ] if end_length > 0 else " "
return start_part + f " \n \n ... (middle truncated) ... \n \n " + end_part + f " \n \n This message is too long, repeat relevant information in your response to remember it "
2025-06-06 16:56:14 +08:00
else :
return msg_content
elif isinstance ( msg_content , dict ) :
2025-06-18 13:53:51 +08:00
json_str = json . dumps ( msg_content )
if len ( json_str ) > max_length :
# Calculate how much to keep from start and end
2025-06-18 15:25:29 +08:00
keep_length = max_length - 150 # Reserve space for truncation message
2025-06-18 13:53:51 +08:00
start_length = keep_length / / 2
end_length = keep_length - start_length
start_part = json_str [ : start_length ]
end_part = json_str [ - end_length : ] if end_length > 0 else " "
return start_part + f " \n \n ... (middle truncated) ... \n \n " + end_part + f " \n \n This message is too long, repeat relevant information in your response to remember it "
2025-06-06 16:56:14 +08:00
else :
return msg_content
2025-06-06 17:18:34 +08:00
def _compress_tool_result_messages ( self , messages : List [ Dict [ str , Any ] ] , llm_model : str , max_tokens : Optional [ int ] , token_threshold : Optional [ int ] = 1000 ) - > List [ Dict [ str , Any ] ] :
2025-06-06 16:56:14 +08:00
""" Compress the tool result messages except the most recent one. """
uncompressed_total_token_count = token_counter ( model = llm_model , messages = messages )
2025-06-16 05:14:09 +08:00
if uncompressed_total_token_count > ( max_tokens or ( 100 * 1000 ) ) :
2025-06-06 16:56:14 +08:00
_i = 0 # Count the number of ToolResult messages
for msg in reversed ( messages ) : # Start from the end and work backwards
if self . _is_tool_result_message ( msg ) : # Only compress ToolResult messages
_i + = 1 # Count the number of ToolResult messages
msg_token_count = token_counter ( messages = [ msg ] ) # Count the number of tokens in the message
if msg_token_count > token_threshold : # If the message is too long
if _i > 1 : # If this is not the most recent ToolResult message
message_id = msg . get ( ' message_id ' ) # Get the message_id
if message_id :
msg [ " content " ] = self . _compress_message ( msg [ " content " ] , message_id , token_threshold * 3 )
else :
logger . warning ( f " UNEXPECTED: Message has no message_id { str ( msg ) [ : 100 ] } " )
else :
2025-06-06 17:18:34 +08:00
msg [ " content " ] = self . _safe_truncate ( msg [ " content " ] , int ( max_tokens * 2 ) )
2025-06-06 16:56:14 +08:00
return messages
2025-06-06 17:18:34 +08:00
def _compress_user_messages ( self , messages : List [ Dict [ str , Any ] ] , llm_model : str , max_tokens : Optional [ int ] , token_threshold : Optional [ int ] = 1000 ) - > List [ Dict [ str , Any ] ] :
2025-06-06 16:56:14 +08:00
""" Compress the user messages except the most recent one. """
uncompressed_total_token_count = token_counter ( model = llm_model , messages = messages )
2025-06-06 17:18:34 +08:00
if uncompressed_total_token_count > ( max_tokens or ( 100 * 1000 ) ) :
2025-06-06 16:56:14 +08:00
_i = 0 # Count the number of User messages
for msg in reversed ( messages ) : # Start from the end and work backwards
if msg . get ( ' role ' ) == ' user ' : # Only compress User messages
_i + = 1 # Count the number of User messages
msg_token_count = token_counter ( messages = [ msg ] ) # Count the number of tokens in the message
if msg_token_count > token_threshold : # If the message is too long
if _i > 1 : # If this is not the most recent User message
message_id = msg . get ( ' message_id ' ) # Get the message_id
if message_id :
msg [ " content " ] = self . _compress_message ( msg [ " content " ] , message_id , token_threshold * 3 )
else :
logger . warning ( f " UNEXPECTED: Message has no message_id { str ( msg ) [ : 100 ] } " )
else :
2025-06-06 17:18:34 +08:00
msg [ " content " ] = self . _safe_truncate ( msg [ " content " ] , int ( max_tokens * 2 ) )
2025-06-06 16:56:14 +08:00
return messages
2025-06-06 17:18:34 +08:00
def _compress_assistant_messages ( self , messages : List [ Dict [ str , Any ] ] , llm_model : str , max_tokens : Optional [ int ] , token_threshold : Optional [ int ] = 1000 ) - > List [ Dict [ str , Any ] ] :
2025-06-06 16:56:14 +08:00
""" Compress the assistant messages except the most recent one. """
uncompressed_total_token_count = token_counter ( model = llm_model , messages = messages )
2025-06-06 17:18:34 +08:00
if uncompressed_total_token_count > ( max_tokens or ( 100 * 1000 ) ) :
2025-06-06 16:56:14 +08:00
_i = 0 # Count the number of Assistant messages
for msg in reversed ( messages ) : # Start from the end and work backwards
if msg . get ( ' role ' ) == ' assistant ' : # Only compress Assistant messages
_i + = 1 # Count the number of Assistant messages
msg_token_count = token_counter ( messages = [ msg ] ) # Count the number of tokens in the message
if msg_token_count > token_threshold : # If the message is too long
if _i > 1 : # If this is not the most recent Assistant message
message_id = msg . get ( ' message_id ' ) # Get the message_id
if message_id :
msg [ " content " ] = self . _compress_message ( msg [ " content " ] , message_id , token_threshold * 3 )
else :
logger . warning ( f " UNEXPECTED: Message has no message_id { str ( msg ) [ : 100 ] } " )
else :
2025-06-06 17:18:34 +08:00
msg [ " content " ] = self . _safe_truncate ( msg [ " content " ] , int ( max_tokens * 2 ) )
2025-06-06 16:56:14 +08:00
return messages
2025-06-16 05:14:09 +08:00
def _remove_meta_messages ( self , messages : List [ Dict [ str , Any ] ] ) - > List [ Dict [ str , Any ] ] :
""" Remove meta messages from the messages. """
result : List [ Dict [ str , Any ] ] = [ ]
for msg in messages :
msg_content = msg . get ( ' content ' )
2025-06-16 17:07:47 +08:00
# Try to parse msg_content as JSON if it's a string
if isinstance ( msg_content , str ) :
try : msg_content = json . loads ( msg_content )
except json . JSONDecodeError : pass
2025-06-16 05:14:09 +08:00
if isinstance ( msg_content , dict ) :
2025-06-16 15:06:24 +08:00
# Create a copy to avoid modifying the original
msg_content_copy = msg_content . copy ( )
if " tool_execution " in msg_content_copy :
tool_execution = msg_content_copy [ " tool_execution " ] . copy ( )
2025-06-16 05:14:09 +08:00
if " arguments " in tool_execution :
del tool_execution [ " arguments " ]
2025-06-16 15:06:24 +08:00
msg_content_copy [ " tool_execution " ] = tool_execution
# Create a new message dict with the modified content
new_msg = msg . copy ( )
new_msg [ " content " ] = json . dumps ( msg_content_copy )
result . append ( new_msg )
else :
result . append ( msg )
2025-06-16 05:14:09 +08:00
return result
2025-06-06 17:18:34 +08:00
def _compress_messages ( self , messages : List [ Dict [ str , Any ] ] , llm_model : str , max_tokens : Optional [ int ] = 41000 , token_threshold : Optional [ int ] = 4096 , max_iterations : int = 5 ) - > List [ Dict [ str , Any ] ] :
2025-06-06 16:56:14 +08:00
""" Compress the messages.
token_threshold : must be a power of 2
"""
2025-06-06 17:18:34 +08:00
if ' sonnet ' in llm_model . lower ( ) :
2025-06-16 15:06:24 +08:00
max_tokens = 200 * 1000 - 64000 - 28000
2025-06-06 17:18:34 +08:00
elif ' gpt ' in llm_model . lower ( ) :
max_tokens = 128 * 1000 - 28000
elif ' gemini ' in llm_model . lower ( ) :
max_tokens = 1000 * 1000 - 300000
elif ' deepseek ' in llm_model . lower ( ) :
2025-06-07 02:48:55 +08:00
max_tokens = 128 * 1000 - 28000
2025-06-06 17:18:34 +08:00
else :
max_tokens = 41 * 1000 - 10000
2025-06-06 16:56:14 +08:00
result = messages
2025-06-16 05:14:09 +08:00
result = self . _remove_meta_messages ( result )
2025-06-06 16:56:14 +08:00
2025-06-16 05:14:09 +08:00
uncompressed_total_token_count = token_counter ( model = llm_model , messages = result )
2025-06-06 16:56:14 +08:00
2025-06-06 17:18:34 +08:00
result = self . _compress_tool_result_messages ( result , llm_model , max_tokens , token_threshold )
result = self . _compress_user_messages ( result , llm_model , max_tokens , token_threshold )
result = self . _compress_assistant_messages ( result , llm_model , max_tokens , token_threshold )
2025-06-06 16:56:14 +08:00
compressed_token_count = token_counter ( model = llm_model , messages = result )
logger . info ( f " _compress_messages: { uncompressed_total_token_count } -> { compressed_token_count } " ) # Log the token compression for debugging later
2025-06-18 15:25:29 +08:00
if max_iterations < = 0 :
2025-06-19 05:25:32 +08:00
logger . warning ( f " _compress_messages: Max iterations reached, omitting messages " )
result = self . _compress_messages_by_omitting_messages ( messages , llm_model , max_tokens )
2025-06-18 15:25:29 +08:00
return result
2025-06-06 17:18:34 +08:00
if ( compressed_token_count > max_tokens ) :
logger . warning ( f " Further token compression is needed: { compressed_token_count } > { max_tokens } " )
result = self . _compress_messages ( messages , llm_model , max_tokens , int ( token_threshold / 2 ) , max_iterations - 1 )
2025-06-06 16:56:14 +08:00
2025-06-20 02:05:45 +08:00
return self . _middle_out_messages ( result )
2025-06-19 05:25:32 +08:00
def _compress_messages_by_omitting_messages (
self ,
messages : List [ Dict [ str , Any ] ] ,
llm_model : str ,
max_tokens : Optional [ int ] = 41000 ,
removal_batch_size : int = 10 ,
min_messages_to_keep : int = 10
) - > List [ Dict [ str , Any ] ] :
""" Compress the messages by omitting messages from the middle.
Args :
messages : List of messages to compress
llm_model : Model name for token counting
max_tokens : Maximum allowed tokens
removal_batch_size : Number of messages to remove per iteration
min_messages_to_keep : Minimum number of messages to preserve
"""
if not messages :
return messages
result = messages
result = self . _remove_meta_messages ( result )
# Early exit if no compression needed
initial_token_count = token_counter ( model = llm_model , messages = result )
max_allowed_tokens = max_tokens or ( 100 * 1000 )
if initial_token_count < = max_allowed_tokens :
return result
# Separate system message (assumed to be first) from conversation messages
system_message = messages [ 0 ] if messages and messages [ 0 ] . get ( ' role ' ) == ' system ' else None
conversation_messages = result [ 1 : ] if system_message else result
safety_limit = 500
current_token_count = initial_token_count
while current_token_count > max_allowed_tokens and safety_limit > 0 :
safety_limit - = 1
if len ( conversation_messages ) < = min_messages_to_keep :
logger . warning ( f " Cannot compress further: only { len ( conversation_messages ) } messages remain (min: { min_messages_to_keep } ) " )
break
# Calculate removal strategy based on current message count
if len ( conversation_messages ) > ( removal_batch_size * 2 ) :
# Remove from middle, keeping recent and early context
middle_start = len ( conversation_messages ) / / 2 - ( removal_batch_size / / 2 )
middle_end = middle_start + removal_batch_size
conversation_messages = conversation_messages [ : middle_start ] + conversation_messages [ middle_end : ]
else :
# Remove from earlier messages, preserving recent context
messages_to_remove = min ( removal_batch_size , len ( conversation_messages ) / / 2 )
if messages_to_remove > 0 :
conversation_messages = conversation_messages [ messages_to_remove : ]
else :
# Can't remove any more messages
break
# Recalculate token count
messages_to_count = ( [ system_message ] + conversation_messages ) if system_message else conversation_messages
current_token_count = token_counter ( model = llm_model , messages = messages_to_count )
# Prepare final result
final_messages = ( [ system_message ] + conversation_messages ) if system_message else conversation_messages
final_token_count = token_counter ( model = llm_model , messages = final_messages )
logger . info ( f " _compress_messages_by_omitting_messages: { initial_token_count } -> { final_token_count } tokens ( { len ( messages ) } -> { len ( final_messages ) } messages) " )
return final_messages
2025-06-20 02:05:45 +08:00
def _middle_out_messages ( self , messages : List [ Dict [ str , Any ] ] , max_messages : int = 320 ) - > List [ Dict [ str , Any ] ] :
""" Remove messages from the middle of the list, keeping max_messages total. """
if len ( messages ) < = max_messages :
return messages
# Keep half from the beginning and half from the end
keep_start = max_messages / / 2
keep_end = max_messages - keep_start
return messages [ : keep_start ] + messages [ - keep_end : ]
2025-06-19 05:25:32 +08:00
2025-06-03 14:32:50 +08:00
2025-03-29 06:33:55 +08:00
def add_tool ( self , tool_class : Type [ Tool ] , function_names : Optional [ List [ str ] ] = None , * * kwargs ) :
""" Add a tool to the ThreadManager. """
self . tool_registry . register_tool ( tool_class , function_names , * * kwargs )
2025-04-06 17:10:18 +08:00
async def add_message (
2025-04-26 19:53:09 +08:00
self ,
thread_id : str ,
type : str ,
content : Union [ Dict [ str , Any ] , List [ Any ] , str ] ,
2025-04-06 17:10:18 +08:00
is_llm_message : bool = False ,
metadata : Optional [ Dict [ str , Any ] ] = None
) :
""" Add a message to the thread in the database.
2025-03-29 06:33:55 +08:00
2025-04-06 17:10:18 +08:00
Args :
thread_id : The ID of the thread to add the message to .
2025-04-08 01:35:40 +08:00
type : The type of the message ( e . g . , ' text ' , ' image_url ' , ' tool_call ' , ' tool ' , ' user ' , ' assistant ' ) .
2025-04-06 17:10:18 +08:00
content : The content of the message . Can be a dictionary , list , or string .
It will be stored as JSONB in the database .
is_llm_message : Flag indicating if the message originated from the LLM .
Defaults to False ( user message ) .
metadata : Optional dictionary for additional message metadata .
Defaults to None , stored as an empty JSONB object if None .
"""
logger . debug ( f " Adding message of type ' { type } ' to thread { thread_id } " )
client = await self . db . client
2025-04-26 19:53:09 +08:00
2025-04-06 17:10:18 +08:00
# Prepare data for insertion
data_to_insert = {
' thread_id ' : thread_id ,
' type ' : type ,
2025-05-25 05:04:07 +08:00
' content ' : content ,
2025-04-06 17:10:18 +08:00
' is_llm_message ' : is_llm_message ,
2025-05-25 05:04:07 +08:00
' metadata ' : metadata or { } ,
2025-04-06 17:10:18 +08:00
}
2025-04-26 19:53:09 +08:00
2025-03-29 06:33:55 +08:00
try :
2025-04-18 06:17:48 +08:00
# Add returning='representation' to get the inserted row data including the id
result = await client . table ( ' messages ' ) . insert ( data_to_insert , returning = ' representation ' ) . execute ( )
2025-04-01 09:26:52 +08:00
logger . info ( f " Successfully added message to thread { thread_id } " )
2025-04-26 19:53:09 +08:00
2025-04-18 06:17:48 +08:00
if result . data and len ( result . data ) > 0 and isinstance ( result . data [ 0 ] , dict ) and ' message_id ' in result . data [ 0 ] :
2025-04-18 13:22:06 +08:00
return result . data [ 0 ]
2025-04-18 06:17:48 +08:00
else :
logger . error ( f " Insert operation failed or did not return expected data structure for thread { thread_id } . Result data: { result . data } " )
return None
2025-03-29 06:33:55 +08:00
except Exception as e :
2025-04-01 09:26:52 +08:00
logger . error ( f " Failed to add message to thread { thread_id } : { str ( e ) } " , exc_info = True )
raise
2025-03-29 06:33:55 +08:00
2025-04-11 07:53:01 +08:00
async def get_llm_messages ( self , thread_id : str ) - > List [ Dict [ str , Any ] ] :
2025-04-06 17:10:18 +08:00
""" Get all messages for a thread.
2025-04-26 19:53:09 +08:00
2025-04-17 07:16:53 +08:00
This method uses the SQL function which handles context truncation
by considering summary messages .
2025-04-26 19:53:09 +08:00
2025-04-06 17:10:18 +08:00
Args :
thread_id : The ID of the thread to get messages for .
2025-04-26 19:53:09 +08:00
2025-04-06 17:10:18 +08:00
Returns :
List of message objects .
"""
logger . debug ( f " Getting messages for thread { thread_id } " )
client = await self . db . client
2025-04-26 19:53:09 +08:00
2025-04-06 17:10:18 +08:00
try :
2025-05-29 21:47:31 +08:00
# result = await client.rpc('get_llm_formatted_messages', {'p_thread_id': thread_id}).execute()
2025-06-23 00:42:55 +08:00
# Fetch messages in batches of 1000 to avoid overloading the database
all_messages = [ ]
batch_size = 1000
offset = 0
while True :
result = await client . table ( ' messages ' ) . select ( ' message_id, content ' ) . eq ( ' thread_id ' , thread_id ) . eq ( ' is_llm_message ' , True ) . order ( ' created_at ' ) . range ( offset , offset + batch_size - 1 ) . execute ( )
if not result . data or len ( result . data ) == 0 :
break
all_messages . extend ( result . data )
# If we got fewer than batch_size records, we've reached the end
if len ( result . data ) < batch_size :
break
offset + = batch_size
# Use all_messages instead of result.data in the rest of the method
result_data = all_messages
2025-04-26 19:53:09 +08:00
2025-04-06 17:10:18 +08:00
# Parse the returned data which might be stringified JSON
2025-06-23 00:42:55 +08:00
if not result_data :
2025-04-01 09:26:52 +08:00
return [ ]
2025-04-26 19:53:09 +08:00
2025-04-06 17:10:18 +08:00
# Return properly parsed JSON objects
messages = [ ]
2025-06-23 00:42:55 +08:00
for item in result_data :
2025-05-29 21:47:31 +08:00
if isinstance ( item [ ' content ' ] , str ) :
2025-04-06 17:10:18 +08:00
try :
2025-05-29 21:47:31 +08:00
parsed_item = json . loads ( item [ ' content ' ] )
parsed_item [ ' message_id ' ] = item [ ' message_id ' ]
2025-04-06 17:10:18 +08:00
messages . append ( parsed_item )
except json . JSONDecodeError :
2025-05-29 21:47:31 +08:00
logger . error ( f " Failed to parse message: { item [ ' content ' ] } " )
2025-04-06 17:10:18 +08:00
else :
2025-05-29 21:47:31 +08:00
content = item [ ' content ' ]
content [ ' message_id ' ] = item [ ' message_id ' ]
messages . append ( content )
2025-04-08 01:35:40 +08:00
2025-04-01 09:26:52 +08:00
return messages
2025-04-26 19:53:09 +08:00
2025-04-01 09:26:52 +08:00
except Exception as e :
logger . error ( f " Failed to get messages for thread { thread_id } : { str ( e ) } " , exc_info = True )
2025-04-06 17:10:18 +08:00
return [ ]
2025-03-29 06:33:55 +08:00
async def run_thread (
self ,
thread_id : str ,
2025-04-04 23:06:49 +08:00
system_prompt : Dict [ str , Any ] ,
stream : bool = True ,
2025-03-29 06:33:55 +08:00
temporary_message : Optional [ Dict [ str , Any ] ] = None ,
2025-04-04 23:06:49 +08:00
llm_model : str = " gpt-4o " ,
llm_temperature : float = 0 ,
llm_max_tokens : Optional [ int ] = None ,
processor_config : Optional [ ProcessorConfig ] = None ,
tool_choice : ToolChoice = " auto " ,
2025-04-08 02:15:13 +08:00
native_max_auto_continues : int = 25 ,
2025-04-09 07:20:58 +08:00
max_xml_tool_calls : int = 0 ,
2025-04-10 21:13:32 +08:00
include_xml_examples : bool = False ,
2025-04-18 12:49:41 +08:00
enable_thinking : Optional [ bool ] = False ,
2025-04-18 14:13:44 +08:00
reasoning_effort : Optional [ str ] = ' low ' ,
2025-05-21 08:39:28 +08:00
enable_context_manager : bool = True ,
generation : Optional [ StatefulGenerationClient ] = None ,
2025-03-29 06:33:55 +08:00
) - > Union [ Dict [ str , Any ] , AsyncGenerator ] :
2025-04-04 23:06:49 +08:00
""" Run a conversation thread with LLM integration and tool execution.
2025-04-26 19:53:09 +08:00
2025-04-04 23:06:49 +08:00
Args :
thread_id : The ID of the thread to run
system_prompt : System message to set the assistant ' s behavior
stream : Use streaming API for the LLM response
temporary_message : Optional temporary user message for this run only
llm_model : The name of the LLM model to use
llm_temperature : Temperature parameter for response randomness ( 0 - 1 )
llm_max_tokens : Maximum tokens in the LLM response
processor_config : Configuration for the response processor
tool_choice : Tool choice preference ( " auto " , " required " , " none " )
2025-04-26 19:53:09 +08:00
native_max_auto_continues : Maximum number of automatic continuations when
2025-04-08 02:15:13 +08:00
finish_reason = " tool_calls " ( 0 disables auto - continue )
2025-04-09 07:20:58 +08:00
max_xml_tool_calls : Maximum number of XML tool calls to allow ( 0 = no limit )
2025-04-10 21:13:32 +08:00
include_xml_examples : Whether to include XML tool examples in the system prompt
2025-04-18 12:49:41 +08:00
enable_thinking : Whether to enable thinking before making a decision
reasoning_effort : The effort level for reasoning
2025-04-18 14:13:44 +08:00
enable_context_manager : Whether to enable automatic context summarization .
2025-04-26 19:53:09 +08:00
2025-04-04 23:06:49 +08:00
Returns :
An async generator yielding response chunks or error dict
"""
2025-04-26 19:53:09 +08:00
2025-04-01 09:26:52 +08:00
logger . info ( f " Starting thread execution for thread { thread_id } " )
2025-04-26 19:53:09 +08:00
logger . info ( f " Using model: { llm_model } " )
# Log parameters
logger . info ( f " Parameters: model= { llm_model } , temperature= { llm_temperature } , max_tokens= { llm_max_tokens } " )
logger . info ( f " Auto-continue: max= { native_max_auto_continues } , XML tool limit= { max_xml_tool_calls } " )
# Log model info
logger . info ( f " 🤖 Thread { thread_id } : Using model { llm_model } " )
2025-04-18 16:09:09 +08:00
# Apply max_xml_tool_calls if specified and not already set in config
if max_xml_tool_calls > 0 and not processor_config . max_xml_tool_calls :
processor_config . max_xml_tool_calls = max_xml_tool_calls
2025-04-26 19:53:09 +08:00
2025-04-18 16:09:09 +08:00
# Create a working copy of the system prompt to potentially modify
working_system_prompt = system_prompt . copy ( )
# Add XML examples to system prompt if requested, do this only ONCE before the loop
if include_xml_examples and processor_config . xml_tool_calling :
xml_examples = self . tool_registry . get_xml_examples ( )
if xml_examples :
examples_content = """
- - - XML TOOL CALLING - - -
In this environment you have access to a set of tools you can use to answer the user ' s question. The tools are specified in XML format.
Format your tool calls using the specified XML tags . Place parameters marked as ' attribute ' within the opening tag ( e . g . , ` < tag attribute = ' value ' > ` ) . Place parameters marked as ' content ' between the opening and closing tags . Place parameters marked as ' element ' within their own child tags ( e . g . , ` < tag > < element > value < / element > < / tag > ` ) . Refer to the examples provided below for the exact structure of each tool .
String and scalar parameters should be specified as attributes , while content goes between tags .
Note that spaces for string values are not stripped . The output is parsed with regular expressions .
Here are the XML tools available with examples :
"""
for tag_name , example in xml_examples . items ( ) :
examples_content + = f " < { tag_name } > Example: { example } \\ n "
2025-05-07 07:56:52 +08:00
# # Save examples content to a file
# try:
# with open('xml_examples.txt', 'w') as f:
# f.write(examples_content)
# logger.debug("Saved XML examples to xml_examples.txt")
# except Exception as e:
# logger.error(f"Failed to save XML examples to file: {e}")
2025-04-18 16:09:09 +08:00
system_content = working_system_prompt . get ( ' content ' )
if isinstance ( system_content , str ) :
working_system_prompt [ ' content ' ] + = examples_content
logger . debug ( " Appended XML examples to string system prompt content. " )
elif isinstance ( system_content , list ) :
appended = False
for item in working_system_prompt [ ' content ' ] : # Modify the copy
if isinstance ( item , dict ) and item . get ( ' type ' ) == ' text ' and ' text ' in item :
item [ ' text ' ] + = examples_content
logger . debug ( " Appended XML examples to the first text block in list system prompt content. " )
appended = True
break
if not appended :
logger . warning ( " System prompt content is a list but no text block found to append XML examples. " )
else :
logger . warning ( f " System prompt content is of unexpected type ( { type ( system_content ) } ), cannot add XML examples. " )
2025-04-08 02:15:13 +08:00
# Control whether we need to auto-continue due to tool_calls finish reason
auto_continue = True
auto_continue_count = 0
2025-04-26 19:53:09 +08:00
2025-04-08 02:15:13 +08:00
# Define inner function to handle a single run
async def _run_once ( temp_msg = None ) :
2025-04-04 23:06:49 +08:00
try :
2025-04-08 02:15:13 +08:00
# Ensure processor_config is available in this scope
2025-04-26 19:53:09 +08:00
nonlocal processor_config
2025-04-18 16:09:09 +08:00
# Note: processor_config is now guaranteed to exist due to check above
2025-04-26 19:53:09 +08:00
2025-04-08 02:15:13 +08:00
# 1. Get messages from thread for LLM call
2025-04-11 07:53:01 +08:00
messages = await self . get_llm_messages ( thread_id )
2025-04-26 19:53:09 +08:00
2025-04-17 07:16:53 +08:00
# 2. Check token count before proceeding
token_count = 0
try :
2025-04-18 16:09:09 +08:00
# Use the potentially modified working_system_prompt for token counting
2025-04-26 19:53:09 +08:00
token_count = token_counter ( model = llm_model , messages = [ working_system_prompt ] + messages )
2025-04-17 07:16:53 +08:00
token_threshold = self . context_manager . token_threshold
logger . info ( f " Thread { thread_id } token count: { token_count } / { token_threshold } ( { ( token_count / token_threshold ) * 100 : .1f } %) " )
2025-04-26 19:53:09 +08:00
2025-05-05 08:53:24 +08:00
# if token_count >= token_threshold and enable_context_manager:
# logger.info(f"Thread token count ({token_count}) exceeds threshold ({token_threshold}), summarizing...")
# summarized = await self.context_manager.check_and_summarize_if_needed(
# thread_id=thread_id,
# add_message_callback=self.add_message,
# model=llm_model,
# force=True
# )
# if summarized:
# logger.info("Summarization complete, fetching updated messages with summary")
# messages = await self.get_llm_messages(thread_id)
# # Recount tokens after summarization, using the modified prompt
# new_token_count = token_counter(model=llm_model, messages=[working_system_prompt] + messages)
# logger.info(f"After summarization: token count reduced from {token_count} to {new_token_count}")
# else:
# logger.warning("Summarization failed or wasn't needed - proceeding with original messages")
# elif not enable_context_manager:
# logger.info("Automatic summarization disabled. Skipping token count check and summarization.")
2025-04-18 16:09:09 +08:00
2025-04-17 07:16:53 +08:00
except Exception as e :
logger . error ( f " Error counting tokens or summarizing: { str ( e ) } " )
2025-04-26 19:53:09 +08:00
2025-04-17 07:16:53 +08:00
# 3. Prepare messages for LLM call + add temporary message if it exists
2025-04-18 16:09:09 +08:00
# Use the working_system_prompt which may contain the XML examples
2025-04-26 19:53:09 +08:00
prepared_messages = [ working_system_prompt ]
2025-04-08 02:15:13 +08:00
# Find the last user message index
last_user_index = - 1
for i , msg in enumerate ( messages ) :
if msg . get ( ' role ' ) == ' user ' :
last_user_index = i
2025-04-26 19:53:09 +08:00
2025-04-08 02:15:13 +08:00
# Insert temporary message before the last user message if it exists
if temp_msg and last_user_index > = 0 :
prepared_messages . extend ( messages [ : last_user_index ] )
prepared_messages . append ( temp_msg )
prepared_messages . extend ( messages [ last_user_index : ] )
logger . debug ( " Added temporary message before the last user message " )
else :
# If no user message or no temporary message, just add all messages
prepared_messages . extend ( messages )
if temp_msg :
prepared_messages . append ( temp_msg )
logger . debug ( " Added temporary message to the end of prepared messages " )
2025-04-08 01:35:40 +08:00
2025-05-03 08:26:25 +08:00
# 4. Prepare tools for LLM call
2025-04-08 02:15:13 +08:00
openapi_tool_schemas = None
if processor_config . native_tool_calling :
openapi_tool_schemas = self . tool_registry . get_openapi_schemas ( )
logger . debug ( f " Retrieved { len ( openapi_tool_schemas ) if openapi_tool_schemas else 0 } OpenAPI tool schemas " )
2025-04-08 01:35:40 +08:00
2025-06-16 17:09:13 +08:00
prepared_messages = self . _compress_messages ( prepared_messages , llm_model )
2025-05-29 21:47:31 +08:00
2025-05-03 08:26:25 +08:00
# 5. Make LLM API call
2025-04-11 00:02:21 +08:00
logger . debug ( " Making LLM API call " )
2025-04-08 02:15:13 +08:00
try :
2025-05-21 08:39:28 +08:00
if generation :
generation . update (
input = prepared_messages ,
start_time = datetime . datetime . now ( datetime . timezone . utc ) ,
model = llm_model ,
model_parameters = {
" max_tokens " : llm_max_tokens ,
" temperature " : llm_temperature ,
" enable_thinking " : enable_thinking ,
" reasoning_effort " : reasoning_effort ,
" tool_choice " : tool_choice ,
" tools " : openapi_tool_schemas ,
}
)
2025-04-08 02:15:13 +08:00
llm_response = await make_llm_api_call (
2025-04-18 16:09:09 +08:00
prepared_messages , # Pass the potentially modified messages
2025-04-08 02:15:13 +08:00
llm_model ,
temperature = llm_temperature ,
max_tokens = llm_max_tokens ,
tools = openapi_tool_schemas ,
tool_choice = tool_choice if processor_config . native_tool_calling else None ,
2025-04-18 12:49:41 +08:00
stream = stream ,
enable_thinking = enable_thinking ,
reasoning_effort = reasoning_effort
2025-04-08 02:15:13 +08:00
)
logger . debug ( " Successfully received raw LLM API response stream/object " )
2025-04-08 01:35:40 +08:00
2025-04-08 02:15:13 +08:00
except Exception as e :
logger . error ( f " Failed to make LLM API call: { str ( e ) } " , exc_info = True )
raise
2025-03-29 06:33:55 +08:00
2025-05-03 08:26:25 +08:00
# 6. Process LLM response using the ResponseProcessor
2025-04-08 02:15:13 +08:00
if stream :
2025-04-11 00:02:21 +08:00
logger . debug ( " Processing streaming response " )
2025-04-08 02:15:13 +08:00
response_generator = self . response_processor . process_streaming_response (
2025-04-06 17:10:18 +08:00
llm_response = llm_response ,
thread_id = thread_id ,
2025-04-18 12:49:41 +08:00
config = processor_config ,
prompt_messages = prepared_messages ,
2025-05-21 08:39:28 +08:00
llm_model = llm_model ,
2025-04-06 17:10:18 +08:00
)
2025-04-26 19:53:09 +08:00
2025-04-08 02:15:13 +08:00
return response_generator
else :
2025-04-11 00:02:21 +08:00
logger . debug ( " Processing non-streaming response " )
2025-05-10 09:58:57 +08:00
# Pass through the response generator without try/except to let errors propagate up
response_generator = self . response_processor . process_non_streaming_response (
llm_response = llm_response ,
thread_id = thread_id ,
config = processor_config ,
prompt_messages = prepared_messages ,
2025-05-21 08:39:28 +08:00
llm_model = llm_model ,
2025-05-10 09:58:57 +08:00
)
return response_generator # Return the generator
2025-04-26 19:53:09 +08:00
2025-04-08 02:15:13 +08:00
except Exception as e :
logger . error ( f " Error in run_thread: { str ( e ) } " , exc_info = True )
2025-05-10 09:58:57 +08:00
# Return the error as a dict to be handled by the caller
2025-04-08 02:15:13 +08:00
return {
2025-06-05 14:46:27 +08:00
" type " : " status " ,
2025-04-08 02:15:13 +08:00
" status " : " error " ,
" message " : str ( e )
}
2025-04-26 19:53:09 +08:00
2025-04-08 02:15:13 +08:00
# Define a wrapper generator that handles auto-continue logic
async def auto_continue_wrapper ( ) :
nonlocal auto_continue , auto_continue_count
2025-04-26 19:53:09 +08:00
2025-04-08 02:15:13 +08:00
while auto_continue and ( native_max_auto_continues == 0 or auto_continue_count < native_max_auto_continues ) :
# Reset auto_continue for this iteration
auto_continue = False
2025-04-26 19:53:09 +08:00
2025-04-18 16:09:09 +08:00
# Run the thread once, passing the potentially modified system prompt
# Pass temp_msg only on the first iteration
2025-05-10 09:58:57 +08:00
try :
response_gen = await _run_once ( temporary_message if auto_continue_count == 0 else None )
# Handle error responses
if isinstance ( response_gen , dict ) and " status " in response_gen and response_gen [ " status " ] == " error " :
logger . error ( f " Error in auto_continue_wrapper: { response_gen . get ( ' message ' , ' Unknown error ' ) } " )
yield response_gen
return # Exit the generator on error
# Process each chunk
try :
async for chunk in response_gen :
# Check if this is a finish reason chunk with tool_calls or xml_tool_limit_reached
if chunk . get ( ' type ' ) == ' finish ' :
if chunk . get ( ' finish_reason ' ) == ' tool_calls ' :
# Only auto-continue if enabled (max > 0)
if native_max_auto_continues > 0 :
logger . info ( f " Detected finish_reason= ' tool_calls ' , auto-continuing ( { auto_continue_count + 1 } / { native_max_auto_continues } ) " )
auto_continue = True
auto_continue_count + = 1
# Don't yield the finish chunk to avoid confusing the client
continue
elif chunk . get ( ' finish_reason ' ) == ' xml_tool_limit_reached ' :
# Don't auto-continue if XML tool limit was reached
logger . info ( f " Detected finish_reason= ' xml_tool_limit_reached ' , stopping auto-continue " )
auto_continue = False
# Still yield the chunk to inform the client
# Otherwise just yield the chunk normally
yield chunk
# If not auto-continuing, we're done
if not auto_continue :
break
except Exception as e :
# If there's an exception, log it, yield an error status, and stop execution
logger . error ( f " Error in auto_continue_wrapper generator: { str ( e ) } " , exc_info = True )
yield {
" type " : " status " ,
" status " : " error " ,
" message " : f " Error in thread processing: { str ( e ) } "
}
return # Exit the generator on any error
except Exception as outer_e :
# Catch exceptions from _run_once itself
logger . error ( f " Error executing thread: { str ( outer_e ) } " , exc_info = True )
yield {
" type " : " status " ,
" status " : " error " ,
" message " : f " Error executing thread: { str ( outer_e ) } "
}
return # Exit immediately on exception from _run_once
2025-04-26 19:53:09 +08:00
2025-04-08 02:15:13 +08:00
# If we've reached the max auto-continues, log a warning
if auto_continue and auto_continue_count > = native_max_auto_continues :
logger . warning ( f " Reached maximum auto-continue limit ( { native_max_auto_continues } ), stopping. " )
yield {
2025-04-26 19:53:09 +08:00
" type " : " content " ,
2025-04-08 02:15:13 +08:00
" content " : f " \n [Agent reached maximum auto-continue limit of { native_max_auto_continues } ] "
}
2025-04-26 19:53:09 +08:00
2025-04-08 02:15:13 +08:00
# If auto-continue is disabled (max=0), just run once
if native_max_auto_continues == 0 :
logger . info ( " Auto-continue is disabled (native_max_auto_continues=0) " )
2025-04-18 16:09:09 +08:00
# Pass the potentially modified system prompt and temp message
2025-04-26 19:53:09 +08:00
return await _run_once ( temporary_message )
2025-04-08 02:15:13 +08:00
# Otherwise return the auto-continue wrapper generator
return auto_continue_wrapper ( )