refactor(copy_project): optimize message copying process with batching and improved cleanup logging

This commit is contained in:
sharath 2025-06-22 12:25:32 +00:00
parent 1b9604c02c
commit 1fca6d8496
No known key found for this signature in database
1 changed files with 71 additions and 43 deletions

View File

@ -143,13 +143,29 @@ async def copy_agent_runs(thread_id: str, new_thread_id: str):
async def copy_messages(thread_id: str, new_thread_id: str):
db = await get_db()
messages = (
await db.schema("public")
.from_("messages")
.select("*")
.eq("thread_id", thread_id)
.execute()
)
messages_data = []
offset = 0
batch_size = 1000
while True:
batch = (
await db.schema("public")
.from_("messages")
.select("*")
.eq("thread_id", thread_id)
.range(offset, offset + batch_size - 1)
.execute()
)
if not batch.data:
break
messages_data.extend(batch.data)
if len(batch.data) < batch_size:
break
offset += batch_size
async def copy_single_message(message, new_thread_id, db):
new_message = (
@ -170,10 +186,21 @@ async def copy_messages(thread_id: str, new_thread_id: str):
)
return new_message.data[0]
tasks = [
copy_single_message(message, new_thread_id, db) for message in messages.data
]
new_messages = await asyncio.gather(*tasks)
tasks = []
for message in messages_data:
tasks.append(copy_single_message(message, new_thread_id, db))
# Process tasks in batches to avoid overwhelming the database
batch_size = 100
new_messages = []
for i in range(0, len(tasks), batch_size):
batch_tasks = tasks[i : i + batch_size]
batch_results = await asyncio.gather(*batch_tasks)
new_messages.extend(batch_results)
# Add delay between batches
if i + batch_size < len(tasks):
await asyncio.sleep(1)
return new_messages
@ -303,16 +330,29 @@ async def main():
f"Error cleaning up sandbox {new_sandbox.id}: {cleanup_error}"
)
if new_project:
try:
logger.info(f"Cleaning up project: {new_project['project_id']}")
await db.table("projects").delete().eq(
"project_id", new_project["project_id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up project {new_project['project_id']}: {cleanup_error}"
)
if new_messages:
for message in new_messages:
try:
logger.info(f"Cleaning up message: {message['message_id']}")
await db.table("messages").delete().eq(
"message_id", message["message_id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up message {message['message_id']}: {cleanup_error}"
)
if new_agent_runs:
for agent_run in new_agent_runs:
try:
logger.info(f"Cleaning up agent run: {agent_run['id']}")
await db.table("agent_runs").delete().eq(
"id", agent_run["id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up agent run {agent_run['id']}: {cleanup_error}"
)
if new_threads:
for thread in new_threads:
@ -326,29 +366,17 @@ async def main():
f"Error cleaning up thread {thread['thread_id']}: {cleanup_error}"
)
if new_agent_runs:
for agent_run in new_agent_runs:
try:
logger.info(f"Cleaning up agent run: {agent_run['run_id']}")
await db.table("agent_runs").delete().eq(
"run_id", agent_run["run_id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up agent run {agent_run['run_id']}: {cleanup_error}"
)
if new_project:
try:
logger.info(f"Cleaning up project: {new_project['project_id']}")
await db.table("projects").delete().eq(
"project_id", new_project["project_id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up project {new_project['project_id']}: {cleanup_error}"
)
if new_messages:
for message in new_messages:
try:
logger.info(f"Cleaning up message: {message['message_id']}")
await db.table("messages").delete().eq(
"message_id", message["message_id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up message {message['message_id']}: {cleanup_error}"
)
await DBConnection.disconnect()
raise e