fix(thread_manager): implement batching for message retrieval to enhance database performance and mitigate db limits

This commit is contained in:
sharath 2025-06-22 16:42:55 +00:00
parent 1fca6d8496
commit 38f4355838
No known key found for this signature in database
1 changed files with 24 additions and 3 deletions

View File

@ -402,15 +402,36 @@ class ThreadManager:
try: try:
# result = await client.rpc('get_llm_formatted_messages', {'p_thread_id': thread_id}).execute() # result = await client.rpc('get_llm_formatted_messages', {'p_thread_id': thread_id}).execute()
result = await client.table('messages').select('message_id, content').eq('thread_id', thread_id).eq('is_llm_message', True).order('created_at').execute()
# Fetch messages in batches of 1000 to avoid overloading the database
all_messages = []
batch_size = 1000
offset = 0
while True:
result = await client.table('messages').select('message_id, content').eq('thread_id', thread_id).eq('is_llm_message', True).order('created_at').range(offset, offset + batch_size - 1).execute()
if not result.data or len(result.data) == 0:
break
all_messages.extend(result.data)
# If we got fewer than batch_size records, we've reached the end
if len(result.data) < batch_size:
break
offset += batch_size
# Use all_messages instead of result.data in the rest of the method
result_data = all_messages
# Parse the returned data which might be stringified JSON # Parse the returned data which might be stringified JSON
if not result.data: if not result_data:
return [] return []
# Return properly parsed JSON objects # Return properly parsed JSON objects
messages = [] messages = []
for item in result.data: for item in result_data:
if isinstance(item['content'], str): if isinstance(item['content'], str):
try: try:
parsed_item = json.loads(item['content']) parsed_item = json.loads(item['content'])