From 02e9bd0fb5fc0bb579bf1e3c7f793f9b2fb5e921 Mon Sep 17 00:00:00 2001 From: dal Date: Sat, 19 Apr 2025 21:22:00 -0600 Subject: [PATCH 1/2] no double complete --- api/server/src/routes/ws/chats/post_chat.rs | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/api/server/src/routes/ws/chats/post_chat.rs b/api/server/src/routes/ws/chats/post_chat.rs index a6941f064..f64924e22 100644 --- a/api/server/src/routes/ws/chats/post_chat.rs +++ b/api/server/src/routes/ws/chats/post_chat.rs @@ -1,4 +1,3 @@ - use anyhow::Result; use handlers::chats::post_chat_handler::ChatCreateNewChat; use handlers::chats::post_chat_handler::{self, ThreadEvent}; @@ -104,17 +103,10 @@ pub async fn post_thread( // Call shared handler with channel for streaming messages match post_chat_handler::post_chat_handler(request, user.clone(), Some(tx)).await { Ok(chat_with_messages) => { - // For prompt-less flows, the handler might already be done, so explicitly send the completed event - // This ensures the client knows the process is complete - let response = WsResponseMessage::new_no_user( - WsRoutes::Chats(ChatsRoute::Post), - WsEvent::Threads(WSThreadEvent::Complete), - &post_chat_handler::BusterContainer::Chat(chat_with_messages), - None, - WsSendMethod::All, - ); - - send_ws_message(&user.id.to_string(), &response).await?; + // The spawned task above already handles forwarding the 'Complete' event + // received from the handler. Sending it again here is redundant. + // The final chat state is implicitly sent when the handler sends + // its BusterContainer::Chat with the Completed event. Ok(()) } Err(e) => { From 7f481dcda4c91dd8e83d4b2de8f57203c1095818 Mon Sep 17 00:00:00 2001 From: dal Date: Sat, 19 Apr 2025 21:56:03 -0600 Subject: [PATCH 2/2] fix on streaming done messages --- .../handlers/src/chats/post_chat_handler.rs | 66 ++++++++++++++++--- 1 file changed, 56 insertions(+), 10 deletions(-) diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index ec2c5b844..ab9b22856 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -1954,12 +1954,6 @@ fn tool_modify_dashboards(id: String, content: String, delta_duration: Duration) files_map.insert(file_id_str, buster_file); } - // Create info for failed files - let failed_files_info: Vec = modify_dashboards_result.failed_files - .into_iter() - .map(|f| FailedFileInfo { name: f.file_name, error: f.error }) // Use fields from FailedFileModification - .collect(); - // Create the BusterReasoningFile using delta_duration and the new title/status let buster_file_message = BusterReasoningMessage::File(BusterReasoningFile { id, @@ -2117,7 +2111,24 @@ fn transform_assistant_tool_message( } } if progress == MessageProgress::Complete { - if let Some(final_text) = tracker.get_complete_text(tool_id.clone()) { + // --- MODIFICATION START --- + // Attempt to parse final arguments directly from the complete chunk + #[derive(Deserialize)] + struct DoneArgs { + final_response: String, + } + let final_text_result = serde_json::from_str::(&tool_call.function.arguments) + .map(|args| args.final_response) + .ok(); // Convert Result to Option + + // -- New: Track if direct parse failed -- + let direct_parse_failed = final_text_result.is_none(); + + // Use directly parsed text if available, otherwise fallback to tracker + let final_text_to_use = final_text_result.or_else(|| tracker.get_complete_text(tool_id.clone())); + + if let Some(final_text) = final_text_to_use { + // --- MODIFICATION END --- all_results.push(ToolTransformResult::Response(BusterChatMessage::Text { id: tool_id.clone(), message: Some(final_text), @@ -2125,14 +2136,24 @@ fn transform_assistant_tool_message( is_final_message: Some(true), originating_tool_name: Some(tool_name.clone()), })); - tracker.clear_chunk(tool_id.clone()); + // --- MODIFICATION: Log warning if direct parse failed (using flag) --- + if direct_parse_failed { + tracing::warn!("Failed to parse final 'done' arguments directly, used tracker fallback for tool_id: {}", tool_id); + } + // --- END MODIFICATION --- + } else { + // Log if neither direct parse nor tracker worked + tracing::error!("Failed to get complete text for 'done' tool (ID: {}) from both direct parse and tracker.", tool_id); } + + tracker.clear_chunk(tool_id.clone()); // Clear the marker for the reasoning message as well let reasoning_message_id = format!("{}_reasoning_finished", tool_id); tracker.clear_chunk(reasoning_message_id); } } "message_user_clarifying_question" => { + // --- NOTE: Keep InProgress logic as is --- if let Some(full_text_value) = parser.process_response_tool_chunk(&tool_call.function.arguments, "text") { let delta = tracker.add_chunk(tool_id.clone(), full_text_value.clone()); if !delta.is_empty() { @@ -2146,7 +2167,24 @@ fn transform_assistant_tool_message( } } if progress == MessageProgress::Complete { - if let Some(final_text) = tracker.get_complete_text(tool_id.clone()) { + // --- MODIFICATION START --- + // Attempt to parse final arguments directly from the complete chunk + #[derive(Deserialize)] + struct ClarifyingQuestionArgs { + text: String, // Assuming the argument name is 'text' + } + let final_text_result = serde_json::from_str::(&tool_call.function.arguments) + .map(|args| args.text) + .ok(); // Convert Result to Option + + // -- New: Track if direct parse failed -- + let direct_parse_failed = final_text_result.is_none(); + + // Use directly parsed text if available, otherwise fallback to tracker + let final_text_to_use = final_text_result.or_else(|| tracker.get_complete_text(tool_id.clone())); + + if let Some(final_text) = final_text_to_use { + // --- MODIFICATION END --- all_results.push(ToolTransformResult::Response(BusterChatMessage::Text { id: tool_id.clone(), message: Some(final_text), @@ -2154,8 +2192,16 @@ fn transform_assistant_tool_message( is_final_message: Some(true), originating_tool_name: Some(tool_name.clone()), })); - tracker.clear_chunk(tool_id.clone()); + // --- MODIFICATION: Log warning if direct parse failed (using flag) --- + if direct_parse_failed { + tracing::warn!("Failed to parse final 'message_user_clarifying_question' arguments directly, used tracker fallback for tool_id: {}", tool_id); + } + // --- END MODIFICATION --- + } else { + // Log if neither direct parse nor tracker worked + tracing::error!("Failed to get complete text for 'message_user_clarifying_question' tool (ID: {}) from both direct parse and tracker.", tool_id); } + tracker.clear_chunk(tool_id.clone()); } }