diff --git a/backend/utils/scripts/copy_project.py b/backend/utils/scripts/copy_project.py index 0f32a88d..e35cdf0d 100644 --- a/backend/utils/scripts/copy_project.py +++ b/backend/utils/scripts/copy_project.py @@ -143,13 +143,29 @@ async def copy_agent_runs(thread_id: str, new_thread_id: str): async def copy_messages(thread_id: str, new_thread_id: str): db = await get_db() - messages = ( - await db.schema("public") - .from_("messages") - .select("*") - .eq("thread_id", thread_id) - .execute() - ) + messages_data = [] + offset = 0 + batch_size = 1000 + + while True: + batch = ( + await db.schema("public") + .from_("messages") + .select("*") + .eq("thread_id", thread_id) + .range(offset, offset + batch_size - 1) + .execute() + ) + + if not batch.data: + break + + messages_data.extend(batch.data) + + if len(batch.data) < batch_size: + break + + offset += batch_size async def copy_single_message(message, new_thread_id, db): new_message = ( @@ -170,10 +186,21 @@ async def copy_messages(thread_id: str, new_thread_id: str): ) return new_message.data[0] - tasks = [ - copy_single_message(message, new_thread_id, db) for message in messages.data - ] - new_messages = await asyncio.gather(*tasks) + tasks = [] + for message in messages_data: + tasks.append(copy_single_message(message, new_thread_id, db)) + + # Process tasks in batches to avoid overwhelming the database + batch_size = 100 + new_messages = [] + for i in range(0, len(tasks), batch_size): + batch_tasks = tasks[i : i + batch_size] + batch_results = await asyncio.gather(*batch_tasks) + new_messages.extend(batch_results) + # Add delay between batches + if i + batch_size < len(tasks): + await asyncio.sleep(1) + return new_messages @@ -303,16 +330,29 @@ async def main(): f"Error cleaning up sandbox {new_sandbox.id}: {cleanup_error}" ) - if new_project: - try: - logger.info(f"Cleaning up project: {new_project['project_id']}") - await db.table("projects").delete().eq( - "project_id", new_project["project_id"] - ).execute() - except Exception as cleanup_error: - logger.error( - f"Error cleaning up project {new_project['project_id']}: {cleanup_error}" - ) + if new_messages: + for message in new_messages: + try: + logger.info(f"Cleaning up message: {message['message_id']}") + await db.table("messages").delete().eq( + "message_id", message["message_id"] + ).execute() + except Exception as cleanup_error: + logger.error( + f"Error cleaning up message {message['message_id']}: {cleanup_error}" + ) + + if new_agent_runs: + for agent_run in new_agent_runs: + try: + logger.info(f"Cleaning up agent run: {agent_run['id']}") + await db.table("agent_runs").delete().eq( + "id", agent_run["id"] + ).execute() + except Exception as cleanup_error: + logger.error( + f"Error cleaning up agent run {agent_run['id']}: {cleanup_error}" + ) if new_threads: for thread in new_threads: @@ -326,29 +366,17 @@ async def main(): f"Error cleaning up thread {thread['thread_id']}: {cleanup_error}" ) - if new_agent_runs: - for agent_run in new_agent_runs: - try: - logger.info(f"Cleaning up agent run: {agent_run['run_id']}") - await db.table("agent_runs").delete().eq( - "run_id", agent_run["run_id"] - ).execute() - except Exception as cleanup_error: - logger.error( - f"Error cleaning up agent run {agent_run['run_id']}: {cleanup_error}" - ) + if new_project: + try: + logger.info(f"Cleaning up project: {new_project['project_id']}") + await db.table("projects").delete().eq( + "project_id", new_project["project_id"] + ).execute() + except Exception as cleanup_error: + logger.error( + f"Error cleaning up project {new_project['project_id']}: {cleanup_error}" + ) - if new_messages: - for message in new_messages: - try: - logger.info(f"Cleaning up message: {message['message_id']}") - await db.table("messages").delete().eq( - "message_id", message["message_id"] - ).execute() - except Exception as cleanup_error: - logger.error( - f"Error cleaning up message {message['message_id']}: {cleanup_error}" - ) await DBConnection.disconnect() raise e