diff --git a/api/libs/database/src/schema.rs b/api/libs/database/src/schema.rs index 41854d206..784730737 100644 --- a/api/libs/database/src/schema.rs +++ b/api/libs/database/src/schema.rs @@ -521,18 +521,6 @@ diesel::table! { } } -diesel::table! { - threads (id) { - id -> Uuid, - title -> Text, - organization_id -> Uuid, - created_at -> Timestamptz, - updated_at -> Timestamptz, - deleted_at -> Nullable, - created_by -> Uuid, - } -} - diesel::table! { threads_deprecated (id) { id -> Uuid, @@ -649,8 +637,6 @@ diesel::joinable!(teams_to_users -> users (user_id)); diesel::joinable!(terms -> organizations (organization_id)); diesel::joinable!(terms_to_datasets -> datasets (dataset_id)); diesel::joinable!(terms_to_datasets -> terms (term_id)); -diesel::joinable!(threads -> organizations (organization_id)); -diesel::joinable!(threads -> users (created_by)); diesel::joinable!(threads_deprecated -> organizations (organization_id)); diesel::joinable!(threads_to_dashboards -> dashboards (dashboard_id)); diesel::joinable!(threads_to_dashboards -> threads_deprecated (thread_id)); @@ -689,7 +675,6 @@ diesel::allow_tables_to_appear_in_same_query!( teams_to_users, terms, terms_to_datasets, - threads, threads_deprecated, threads_to_dashboards, user_favorites, diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 023aba8c1..03748acad 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -200,6 +200,7 @@ pub async fn post_chat_handler( // Initialize raw_llm_messages with initial_messages let mut raw_llm_messages = initial_messages.clone(); + let mut raw_response_message = String::new(); // Initialize the agent thread let mut chat = AgentThread::new(Some(chat_id), user.id, initial_messages); @@ -235,9 +236,26 @@ pub async fn post_chat_handler( // Only store completed messages in raw_llm_messages match &msg { - AgentMessage::Assistant { progress, .. } => { + AgentMessage::Assistant { + progress, content, .. + } => { + if let Some(content) = content { + raw_response_message.push_str(&content); + } + if matches!(progress, MessageProgress::Complete) { - raw_llm_messages.push(msg.clone()); + if raw_response_message.is_empty() { + raw_llm_messages.push(msg.clone()); + } else { + raw_llm_messages.push(AgentMessage::Assistant { + id: None, + content: Some(raw_response_message.clone()), + name: None, + tool_calls: None, + progress: MessageProgress::Complete, + initial: false, + }); + } } } AgentMessage::Tool { progress, .. } => { @@ -395,7 +413,8 @@ pub async fn post_chat_handler( fn prepare_final_message_state(containers: &[BusterContainer]) -> Result<(Vec, Vec)> { let mut response_messages = Vec::new(); // Use a HashMap to track the latest reasoning message for each ID - let mut reasoning_map: std::collections::HashMap = std::collections::HashMap::new(); + let mut reasoning_map: std::collections::HashMap = + std::collections::HashMap::new(); for container in containers { match container { @@ -426,7 +445,9 @@ fn prepare_final_message_state(containers: &[BusterContainer]) -> Result<(Vec thought.status == "completed", BusterReasoningMessage::File(file) => file.status == "completed", - BusterReasoningMessage::Text(text) => text.status.as_deref() == Some("completed"), + BusterReasoningMessage::Text(text) => { + text.status.as_deref() == Some("completed") + } }; if should_include { @@ -703,7 +724,11 @@ pub async fn transform_message( vec![] } }; - containers.extend(chat_messages.into_iter().map(|container| (container, ThreadEvent::GeneratingResponseMessage))); + containers.extend( + chat_messages + .into_iter() + .map(|container| (container, ThreadEvent::GeneratingResponseMessage)), + ); // Add the "Finished reasoning" message if we're just starting if initial { @@ -724,10 +749,7 @@ pub async fn transform_message( message_id: *message_id, }); - containers.push(( - reasoning_container, - ThreadEvent::GeneratingResponseMessage, - )); + containers.push((reasoning_container, ThreadEvent::GeneratingResponseMessage)); } Ok(containers) @@ -746,7 +768,11 @@ pub async fn transform_message( for reasoning_container in messages { // Only process file response messages when they're completed match &reasoning_container { - BusterReasoningMessage::File(file) if matches!(progress, MessageProgress::Complete) && file.status == "completed" && file.message_type == "files" => { + BusterReasoningMessage::File(file) + if matches!(progress, MessageProgress::Complete) + && file.status == "completed" + && file.message_type == "files" => + { // For each completed file, create and send a file response message for (file_id, file_content) in &file.files { let response_message = BusterChatMessage::File { @@ -767,11 +793,13 @@ pub async fn transform_message( }; containers.push(( - BusterContainer::ChatMessage(BusterChatMessageContainer { - response_message, - chat_id: *chat_id, - message_id: *message_id, - }), + BusterContainer::ChatMessage( + BusterChatMessageContainer { + response_message, + chat_id: *chat_id, + message_id: *message_id, + }, + ), ThreadEvent::GeneratingResponseMessage, )); } @@ -780,11 +808,13 @@ pub async fn transform_message( } containers.push(( - BusterContainer::ReasoningMessage(BusterReasoningMessageContainer { - reasoning: reasoning_container, - chat_id: *chat_id, - message_id: *message_id, - }), + BusterContainer::ReasoningMessage( + BusterReasoningMessageContainer { + reasoning: reasoning_container, + chat_id: *chat_id, + message_id: *message_id, + }, + ), ThreadEvent::GeneratingReasoningMessage, )); } @@ -822,14 +852,18 @@ pub async fn transform_message( ) { Ok(messages) => messages .into_iter() - .map(|container| ( - BusterContainer::ReasoningMessage(BusterReasoningMessageContainer { - reasoning: container, - chat_id: *chat_id, - message_id: *message_id, - }), - ThreadEvent::GeneratingReasoningMessage, - )) + .map(|container| { + ( + BusterContainer::ReasoningMessage( + BusterReasoningMessageContainer { + reasoning: container, + chat_id: *chat_id, + message_id: *message_id, + }, + ), + ThreadEvent::GeneratingReasoningMessage, + ) + }) .collect(), Err(e) => { tracing::warn!("Error transforming tool message '{}': {:?}", name_str, e); @@ -1164,9 +1198,10 @@ fn transform_assistant_tool_message( MessageProgress::Complete => { // For completed files, only send the final state let mut updated_files = std::collections::HashMap::new(); - + for (file_id, file_content) in file.files.iter() { - let chunk_id = format!("{}_{}", file.id, file_content.file_name); + let chunk_id = + format!("{}_{}", file.id, file_content.file_name); let complete_text = tracker .get_complete_text(chunk_id.clone()) .unwrap_or_else(|| { @@ -1193,10 +1228,12 @@ fn transform_assistant_tool_message( let mut updated_files = std::collections::HashMap::new(); for (file_id, file_content) in file.files.iter() { - let chunk_id = format!("{}_{}", file.id, file_content.file_name); + let chunk_id = + format!("{}_{}", file.id, file_content.file_name); if let Some(chunk) = &file_content.file.text_chunk { - let delta = tracker.add_chunk(chunk_id.clone(), chunk.clone()); + let delta = + tracker.add_chunk(chunk_id.clone(), chunk.clone()); if !delta.is_empty() { let mut updated_content = file_content.clone();