From 693b652d38398eff9f7463088c84186710d45667 Mon Sep 17 00:00:00 2001 From: Nate Kelley Date: Thu, 24 Apr 2025 11:41:31 -0600 Subject: [PATCH 1/3] change prevent navigation to actually drop listener --- api/makefile | 5 +- web/package-lock.json | 8 +-- web/package.json | 2 +- .../ui/layouts/PreventNavigation.tsx | 61 +++++++------------ 4 files changed, 29 insertions(+), 47 deletions(-) diff --git a/api/makefile b/api/makefile index e4a8aeb1f..44a6aacb7 100644 --- a/api/makefile +++ b/api/makefile @@ -17,6 +17,7 @@ stop: pkill ollama fast: - export RUST_LOG=debug - export CARGO_INCREMENTAL=1 + cd .. && docker compose up -d redis && cd api && \ + export RUST_LOG=debug && \ + export CARGO_INCREMENTAL=1 && \ nice cargo watch -C server -x run \ No newline at end of file diff --git a/web/package-lock.json b/web/package-lock.json index 3558766af..fb6261c49 100644 --- a/web/package-lock.json +++ b/web/package-lock.json @@ -68,7 +68,7 @@ "next-themes": "^0.4.6", "papaparse": "^5.5.2", "pluralize": "^8.0.0", - "posthog-js": "^1.236.5", + "posthog-js": "^1.236.6", "prettier": "^3.5.3", "prettier-plugin-tailwindcss": "^0.6.11", "react": "^18", @@ -17986,9 +17986,9 @@ "license": "MIT" }, "node_modules/posthog-js": { - "version": "1.236.5", - "resolved": "https://registry.npmjs.org/posthog-js/-/posthog-js-1.236.5.tgz", - "integrity": "sha512-2FrWVZwcLyeZAtdDckJaCfsk9m6DMdr/nVPVSqzF7yvm9pDsdbkvB3A16iqRj5L3EcqV2xWOcv8xWmKgdnNnqA==", + "version": "1.236.6", + "resolved": "https://registry.npmjs.org/posthog-js/-/posthog-js-1.236.6.tgz", + "integrity": "sha512-IX4fkn3HCK+ObdHr/AuWd+Ks7bgMpRpOQB93b5rDJAWkG4if4xFVUn5pgEjyCNeOO2GM1ECnp08q9tYNYEfwbA==", "license": "MIT", "dependencies": { "core-js": "^3.38.1", diff --git a/web/package.json b/web/package.json index 7598e960f..30466af0c 100644 --- a/web/package.json +++ b/web/package.json @@ -77,7 +77,7 @@ "next-themes": "^0.4.6", "papaparse": "^5.5.2", "pluralize": "^8.0.0", - "posthog-js": "^1.236.5", + "posthog-js": "^1.236.6", "prettier": "^3.5.3", "prettier-plugin-tailwindcss": "^0.6.11", "react": "^18", diff --git a/web/src/components/ui/layouts/PreventNavigation.tsx b/web/src/components/ui/layouts/PreventNavigation.tsx index 44291981f..66a5c3de2 100644 --- a/web/src/components/ui/layouts/PreventNavigation.tsx +++ b/web/src/components/ui/layouts/PreventNavigation.tsx @@ -38,23 +38,24 @@ export const PreventNavigation: React.FC = React.memo( */ const handleClick = useMemoizedFn((event: MouseEvent) => { let originalTarget = event.target as HTMLElement; - let target = event.target as HTMLElement; - let href: string | null = null; let originalEvent = event; - // Traverse up the DOM tree looking for an anchor tag with href - while (target && !href) { - if (target instanceof HTMLAnchorElement && target.href) { - href = target.href; - break; - } - target = target.parentElement as HTMLElement; - } - - // Check if we're navigating to the same URL - if so, allow the navigation - if (href && new URL(href).pathname === window.location.pathname) { - return; // Allow navigation to the same URL - } + const newEvent = new MouseEvent('click', { + bubbles: true, + cancelable: true, + view: window, + detail: originalEvent.detail, + screenX: originalEvent.screenX, + screenY: originalEvent.screenY, + clientX: originalEvent.clientX, + clientY: originalEvent.clientY, + ctrlKey: originalEvent.ctrlKey, + altKey: originalEvent.altKey, + shiftKey: originalEvent.shiftKey, + metaKey: originalEvent.metaKey, + button: originalEvent.button, + buttons: originalEvent.buttons + }); if (isDirty) { event.preventDefault(); @@ -75,24 +76,6 @@ export const PreventNavigation: React.FC = React.memo( originalTarget.onclick = null; } - // Create a new click event - const newEvent = new MouseEvent('click', { - bubbles: true, - cancelable: true, - view: window, - detail: originalEvent.detail, - screenX: originalEvent.screenX, - screenY: originalEvent.screenY, - clientX: originalEvent.clientX, - clientY: originalEvent.clientY, - ctrlKey: originalEvent.ctrlKey, - altKey: originalEvent.altKey, - shiftKey: originalEvent.shiftKey, - metaKey: originalEvent.metaKey, - button: originalEvent.button, - buttons: originalEvent.buttons - }); - // Dispatch the event directly on the original target originalTarget.dispatchEvent(newEvent); @@ -100,16 +83,14 @@ export const PreventNavigation: React.FC = React.memo( if (clickHandlers) { originalTarget.onclick = clickHandlers; } - - // Re-attach our listeners after a short delay - setTimeout(() => { - document.querySelectorAll('a').forEach((link) => { - link.addEventListener('click', handleClick); - }); - }, 50); }; setLeavingPage(true); + } else { + document.querySelectorAll('a').forEach((link) => { + link.removeEventListener('click', handleClick); + }); + originalTarget.dispatchEvent(newEvent); } }); From 469fd620a331d3b66ad8400a261e6494dbf63519 Mon Sep 17 00:00:00 2001 From: dal Date: Thu, 24 Apr 2025 11:48:22 -0600 Subject: [PATCH 2/3] sync job fix --- api/libs/stored_values/src/jobs.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/api/libs/stored_values/src/jobs.rs b/api/libs/stored_values/src/jobs.rs index aba8838f4..6c4325d3d 100644 --- a/api/libs/stored_values/src/jobs.rs +++ b/api/libs/stored_values/src/jobs.rs @@ -171,11 +171,12 @@ pub async fn sync_distinct_values_chunk( loop { let distinct_sql = format!( - "SELECT DISTINCT {q}{col}{q} FROM {q}{schema}{q}.{q}{table}{q} ORDER BY 1 NULLS LAST LIMIT {limit} OFFSET {offset}", + "SELECT DISTINCT {q}{col}{q} FROM {q}{db}{q}.{q}{schema}{q}.{q}{table}{q} ORDER BY 1 NULLS LAST LIMIT {limit} OFFSET {offset}", q = quote, - col = q(&column_name), - schema = q(&schema_name), - table = q(&table_name), + col = q(&column_name.to_uppercase()), + db = q(&database_name.to_uppercase()), + schema = q(&schema_name.to_uppercase()), + table = q(&table_name.to_uppercase()), limit = SYNC_CHUNK_LIMIT, offset = offset ); From c1f35d6144fc6b15579b449dea5e59431382b455 Mon Sep 17 00:00:00 2001 From: dal Date: Thu, 24 Apr 2025 11:49:47 -0600 Subject: [PATCH 3/3] streaming queries --- .../handlers/src/chats/post_chat_handler.rs | 135 ++++++++++++++---- .../handlers/src/chats/streaming_parser.rs | 23 +-- 2 files changed, 118 insertions(+), 40 deletions(-) diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index e89fb73e7..cd4ff69d2 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -2281,29 +2281,116 @@ fn transform_assistant_tool_message( // --- Handle Search Tool (Simple Text) --- "search_data_catalog" => { - // This tool doesn't stream complex arguments, just signals searching. - // Only send the message once when arguments start appearing. - if let Some(_args) = parser.process_search_data_catalog_chunk(tool_id.clone(), &tool_call.function.arguments) { - if tracker.get_complete_text(tool_id.clone()).is_none() { // Send only once - let search_msg = BusterReasoningMessage::Text(BusterReasoningText { - id: tool_id.clone(), - reasoning_type: "text".to_string(), - title: "Searching your data catalog...".to_string(), - secondary_title: "".to_string(), - message: None, - message_chunk: None, - status: Some("loading".to_string()), - finished_reasoning: false, - }); - all_results.push(ToolTransformResult::Reasoning(search_msg)); - // Use tracker to mark that we've sent the initial message for this ID - tracker.add_chunk(tool_id.clone(), "searching_sent".to_string()); - } + // --- Handle InProgress: Send initial message once --- + if progress == MessageProgress::InProgress { + // Use tracker to ensure this initial message is sent only once per tool call ID + let tracker_key = format!("{}_initial_search", tool_id); + if tracker.get_complete_text(tracker_key.clone()).is_none() { + let initial_search_msg = BusterReasoningMessage::Text(BusterReasoningText { + id: tool_id.clone(), // Use the tool call ID for the message + reasoning_type: "text".to_string(), + title: "Searching data catalog".to_string(), + secondary_title: "".to_string(), + message: None, + message_chunk: None, + status: Some("loading".to_string()), + finished_reasoning: false, + }); + all_results.push(ToolTransformResult::Reasoning(initial_search_msg)); + // Mark that we've sent the initial message for this ID + tracker.add_chunk(tracker_key, "initial_sent".to_string()); + } } - if progress == MessageProgress::Complete { - // The result from transform_tool_message will show final status and duration. - // Clear the tracker entry used for the initial "searching..." message. - tracker.clear_chunk(tool_id.clone()); + // --- Handle Complete: Update message with queries --- + else if progress == MessageProgress::Complete { + // Define struct to parse final arguments + #[derive(Deserialize)] + struct SearchArgs { + specific_queries: Option>, + exploratory_topics: Option>, + } + + // Parse the complete arguments string + match serde_json::from_str::(&tool_call.function.arguments) { + Ok(args) => { + let queries = args.specific_queries.unwrap_or_default(); + let topics = args.exploratory_topics.unwrap_or_default(); + + println!("queries: {:?}", queries); + println!("topics: {:?}", topics); + + if !queries.is_empty() || !topics.is_empty() { + let mut message_parts = Vec::new(); + if !queries.is_empty() { + message_parts.push("Specific Queries:".to_string()); + message_parts.extend(queries.iter().map(|q| format!("- {}", q))); + } + if !topics.is_empty() { + if !message_parts.is_empty() { message_parts.push("".to_string()); } // Add separator + message_parts.push("Exploratory Topics:".to_string()); + message_parts.extend(topics.iter().map(|t| format!("- {}", t))); + } + + // Create the UPDATED reasoning message with the full query list + // Use the SAME tool_id as the initial message + let mut pill_containers = Vec::new(); + + if !queries.is_empty() { + pill_containers.push(BusterThoughtPillContainer { + title: "Specific Queries".to_string(), + pills: queries.iter().enumerate().map(|(i, q)| BusterThoughtPill { + // Use a combination of tool_id and index for a unique pill id + id: format!("{}_query_{}", tool_id, i), + text: q.clone(), + thought_file_type: "query".to_string(), + }).collect(), + }); + } + + if !topics.is_empty() { + pill_containers.push(BusterThoughtPillContainer { + title: "Exploratory Topics".to_string(), + pills: topics.iter().enumerate().map(|(i, t)| BusterThoughtPill { + // Use a combination of tool_id and index for a unique pill id + id: format!("{}_topic_{}", tool_id, i), + text: t.clone(), + thought_file_type: "topic".to_string(), + }).collect(), + }); + } + + // Create the Pill reasoning message + let updated_search_msg = BusterReasoningMessage::Pill(BusterReasoningPill { + id: tool_id.clone(), + thought_type: "pills".to_string(), // Correct type + title: "Searching data catalog".to_string(), // Keep title simple + secondary_title: "".to_string(), // Duration added by tool result later + pill_containers: Some(pill_containers), // Use the generated containers + status: "loading".to_string(), // Still loading until tool result comes back + }); + all_results.push(ToolTransformResult::Reasoning(updated_search_msg)); + } else { + // If parsing succeeds but queries/topics are empty, + // we might want to keep the initial simple "Searching..." message. + // Currently, this branch does nothing, leaving the initial message as is. + tracing::debug!("Search arguments parsed but no queries or topics found for tool_id: {}", tool_id); + } + } + Err(e) => { + tracing::warn!( + "Failed to parse final search_data_catalog arguments for {}: {}. Args: {}", + tool_id, + e, + tool_call.function.arguments + ); + // If parsing fails, the initial "Searching..." message remains. + // Optionally create an error reasoning message here. + } + } + + // Clear the tracker flag used for the initial message + let tracker_key = format!("{}_initial_search", tool_id); + tracker.clear_chunk(tracker_key); } } @@ -3085,7 +3172,7 @@ async fn apply_file_filtering_rules( // return the context dashboard (unmodified info) followed by the other processed assets. tracing::debug!("Context dashboard {} was NOT modified this turn (only metrics). Prepending context info.", ctx_id); Ok(vec![context_dashboard_info] // Use the fetched (unmodified) context info - .into_iter() + .into_iter() .chain(new_filtered_assets.into_iter()) .collect()) } @@ -3166,7 +3253,7 @@ fn process_current_turn_files( .map_or(false, |uuid| unreferenced_metric_uuids.contains(&uuid)) }) .cloned() - .collect(); + .collect(); // Return unreferenced metrics first, then dashboards let mut combined = unreferenced_metrics; diff --git a/api/libs/handlers/src/chats/streaming_parser.rs b/api/libs/handlers/src/chats/streaming_parser.rs index 7966638bf..94afcad4f 100644 --- a/api/libs/handlers/src/chats/streaming_parser.rs +++ b/api/libs/handlers/src/chats/streaming_parser.rs @@ -1,6 +1,7 @@ use agents::tools::categories::file_tools::common::generate_deterministic_uuid; use anyhow::Result; use serde_json::Value; +use serde::Deserialize; use super::post_chat_handler::{ BusterFile, BusterFileContent, BusterReasoningFile, BusterReasoningMessage, BusterReasoningText, @@ -62,22 +63,12 @@ impl StreamingParser { pub fn process_search_data_catalog_chunk( &mut self, _id: String, - chunk: &str, - ) -> Option { - self.clear_buffer(); - self.buffer.push_str(chunk); - - let processed_json = self.complete_json_structure(self.buffer.clone()); - - // Try to parse arguments, return Some(Value) if successful and looks like search args - if let Ok(value) = serde_json::from_str::(&processed_json) { - if value.get("queries").is_some() { - return Some(value); - } - } - - // If the start of queries is not detected, return None - None + _chunk: &str, + ) -> Result> { + // We no longer process streaming search args here. + // Logic moved to post_chat_handler on message completion. + self.clear_buffer(); // Clear buffer in case it was used + Ok(None) // Always return None during streaming for this tool now } // Process chunks meant for metric files