From 8848578e021b1b75209859803dee42e4575d7da8 Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 11 Jun 2025 23:11:37 -0600 Subject: [PATCH] feat: implement tool result truncation and enhance data catalog search - Added a new method to truncate previous tool results for better conversation management, allowing for cleaner interactions by limiting the number of displayed results. - Updated the `search_data_catalog` tool to automatically truncate previous search results, ensuring that only relevant and recent data is presented to users. - Enhanced the data catalog search functionality to load all available datasets with fresh value injection, improving the overall user experience and data accessibility. --- api/libs/agents/src/agent.rs | 40 + api/libs/agents/src/agents/modes/analysis.rs | 3 + .../src/agents/modes/data_catalog_search.rs | 105 ++- .../file_tools/search_data_catalog.rs | 816 ++---------------- 4 files changed, 167 insertions(+), 797 deletions(-) diff --git a/api/libs/agents/src/agent.rs b/api/libs/agents/src/agent.rs index 905897509..4ca50fbe9 100644 --- a/api/libs/agents/src/agent.rs +++ b/api/libs/agents/src/agent.rs @@ -361,6 +361,46 @@ impl Agent { .map(|thread| thread.messages.clone()) } + /// Truncate previous tool results of a specific tool to keep conversation manageable + pub async fn truncate_previous_tool_results(&self, tool_name: &str, replacement_content: &str) -> Result<()> { + let mut thread_lock = self.current_thread.write().await; + if let Some(thread) = thread_lock.as_mut() { + let mut modified_count = 0; + + // Find and truncate previous tool results, but skip the most recent one if it exists + let mut tool_message_indices: Vec = Vec::new(); + for (index, message) in thread.messages.iter().enumerate() { + if let AgentMessage::Tool { name: Some(ref msg_tool_name), .. } = message { + if msg_tool_name == tool_name { + tool_message_indices.push(index); + } + } + } + + // Skip the last occurrence (if any) and truncate all previous ones + if tool_message_indices.len() > 1 { + // Remove the last index (most recent) from the list to truncate + tool_message_indices.pop(); + + for &index in &tool_message_indices { + if let Some(AgentMessage::Tool { content, .. }) = thread.messages.get_mut(index) { + *content = replacement_content.to_string(); + modified_count += 1; + } + } + } + + if modified_count > 0 { + debug!( + tool_name = tool_name, + truncated_count = modified_count, + "Truncated previous tool results to keep conversation manageable" + ); + } + } + Ok(()) + } + /// Update the current thread with a new message async fn update_current_thread(&self, message: AgentMessage) -> Result<()> { let mut thread_lock = self.current_thread.write().await; diff --git a/api/libs/agents/src/agents/modes/analysis.rs b/api/libs/agents/src/agents/modes/analysis.rs index 8939f9c36..25ad2a98a 100644 --- a/api/libs/agents/src/agents/modes/analysis.rs +++ b/api/libs/agents/src/agents/modes/analysis.rs @@ -190,6 +190,9 @@ const SNOWFLAKE_DIALECT_GUIDANCE: &str = r##" "##; const BIGQUERY_DIALECT_GUIDANCE: &str = r##" +- You must escape the `..` format for table names. with the backtick character or it wont compile. + - Or exclude the project id from the table name. + - If any use the `-` you need to escape it with a backtick. - **Date/Time Functions (BigQuery)**: - **`DATE_TRUNC`**: `DATE_TRUNC(column, DAY)`, `DATE_TRUNC(column, WEEK)`, `DATE_TRUNC(column, MONTH)`, etc. Week starts Sunday by default, use `WEEK(MONDAY)` for Monday start. - **`EXTRACT`**: `EXTRACT(DAYOFWEEK FROM column)` (1=Sun, 7=Sat), `EXTRACT(ISOWEEK FROM column)`. diff --git a/api/libs/agents/src/agents/modes/data_catalog_search.rs b/api/libs/agents/src/agents/modes/data_catalog_search.rs index 6100cdb64..cce5c5827 100644 --- a/api/libs/agents/src/agents/modes/data_catalog_search.rs +++ b/api/libs/agents/src/agents/modes/data_catalog_search.rs @@ -84,9 +84,9 @@ pub fn get_configuration(agent_data: &ModeAgentData, _data_source_syntax: Option } } -// Keep the prompt constant, but it's no longer pub +// Keep the prompt constant, but updated to reflect the new behavior const DATA_CATALOG_SEARCH_PROMPT: &str = r##"**Role & Task** -You are a Search Strategist Agent. Your primary goal is to analyze the conversation history, the most recent user message, and available dataset descriptions to formulate the optimal parameters for the `search_data_catalog` tool or determine that no search is needed (`no_search_needed`). +You are a Data Loading Agent. Your primary goal is to analyze the conversation history and the most recent user message to determine whether to load all available datasets with fresh value injection (`search_data_catalog`) or skip loading if no new data is needed (`no_search_needed`). Your sole output MUST be a call to **ONE** of these tools: `search_data_catalog` or `no_search_needed`. @@ -94,80 +94,75 @@ Your sole output MUST be a call to **ONE** of these tools: `search_data_catalog` ``` {DATASET_DESCRIPTIONS} ``` -*(This section contains summaries or relevant snippets of YAML/metadata for datasets the agent is aware of. Use this to reason about potential joins, available attributes, and data relationships.)* +*(This section contains summaries or relevant snippets of YAML/metadata for datasets. Use this to understand what data is available.)* **Core Responsibilities:** -1. **Analyze Request & Context**: Evaluate the user's request (`"content"` field of `"role": "user"` messages), conversation history, and `{DATASET_DESCRIPTIONS}`. -2. **Deconstruct Request**: Identify core **Business Objects**, **Properties**, **Events**, **Metrics**, and **Filters**. -3. **Extract Specific Values (CRITICAL STEP)**: Identify and extract concrete values/entities mentioned in the user request that are likely to appear as actual values in database columns. This is crucial for the `value_search_terms` parameter. +1. **Analyze Request & Context**: Evaluate the user's request (`"content"` field of `"role": "user"` messages) and conversation history. +2. **Deconstruct Request**: Identify core **Business Objects**, **Properties**, **Events**, **Metrics**, and **Filters** needed for analysis. +3. **Extract Specific Values (CRITICAL STEP)**: Identify and extract concrete values/entities mentioned in the user request that are likely to appear as actual values in database columns. This is crucial for fresh value injection. * **Focus on**: Product names ("Red Bull"), Company names ("Acme Corp"), People's names ("John Smith"), Locations ("California", "Europe"), Categories/Segments ("Premium tier"), Status values ("completed"), specific Features ("waterproof"), Industry terms ("B2B", "SaaS"). - * **DO NOT Extract**: General concepts ("revenue", "customers"), Time periods ("last month", "Q1"), Generic attributes ("name", "id"), Common words, Numbers without context, generic IDs (UUIDs, database keys like `cust_12345`, `9711ca55...`), or composite strings containing non-semantic identifiers (e.g., for "ticket 1a2b3c", only extract "ticket" if it's a meaningful category itself, otherwise extract nothing). Focus *only* on values with inherent business meaning. - * **Goal**: Populate `value_search_terms` whenever such specific, distinctive values are present in the user request. -4. **Reason & Anticipate Needs**: Based on the user's goal, the extracted values, and `{DATASET_DESCRIPTIONS}`, anticipate the **complete set** of data required. Consider implicit needs (e.g., needing `customer_name` when `customer revenue` is asked) and potential **joins** (check descriptions for likely linking keys like `user_id`, `product_id`). -5. **Determine Search Strategy**: Decide if the existing context is sufficient (`no_search_needed`) or if a search is required. -6. **Generate Tool Call Parameters**: If searching, formulate parameters for `search_data_catalog`, deciding the appropriate combination of `specific_queries`, `exploratory_topics`, and the extracted `value_search_terms`. + * **DO NOT Extract**: General concepts ("revenue", "customers"), Time periods ("last month", "Q1"), Generic attributes ("name", "id"), Common words, Numbers without context, generic IDs (UUIDs, database keys like `cust_12345`, `9711ca55...`), or composite strings containing non-semantic identifiers. Focus *only* on values with inherent business meaning. + * **Goal**: Populate `value_search_terms` to enable fresh value injection into dataset YAMLs. +4. **Determine Loading Strategy**: Decide if data loading is needed for fresh analysis or if current context is sufficient. +5. **Generate Tool Call Parameters**: If loading data, formulate parameters for `search_data_catalog` which will load ALL datasets with fresh value injection. **Workflow & Decision Logic:** -1. **Analyze Request & Context**: Review the latest user message, history, and `{DATASET_DESCRIPTIONS}`. -2. **Extract Specific Values**: Identify concrete values from the user request for potential use in `value_search_terms`. -3. **Check for Visualization-Only Request**: If the request is *purely* about visual aspects (chart types, colors) -> Call `no_search_needed`. -4. **Assess Existing Context**: Evaluate if `{DATASET_DESCRIPTIONS}` (reflecting previous finds) is sufficient for the *current* request's analytical needs (including anticipated joins/attributes). - * **If Sufficient**: Call `no_search_needed`. - * **If Insufficient OR No Context**: Proceed to formulate search parameters. -5. **Formulate Search Parameters (Apply Reasoning & Extracted Values)**: - * **Identify Request Nature**: Is it specific, exploratory, or mixed? - * **Specific Requests** (e.g., "Top customer by revenue", "Sales for Product X last month"): Generate `specific_queries`. Queries should explicitly ask for identified Objects, Properties, Events, Metrics, Filters, AND anticipated attributes/joining keys. *Aim for 1-3 focused queries.* - * **Exploratory Requests** (e.g., "Tell me about revenue", "Factors influencing churn"): Generate `exploratory_topics`. Topics should represent broader themes for discovery. *Aim for 3-5 distinct topics.* - * **Mixed Requests** (e.g., "Who is my top customer [Nike] and tell me all about them?"): Generate *both* `specific_queries` (for the "top customer" part) *and* `exploratory_topics` (for the "tell me all about them" part). - * **Populate `value_search_terms` (ALWAYS if applicable)**: If Step 2 extracted specific values ("Red Bull", "California", "Premium tier", "John Smith", etc.), include them in the `value_search_terms` list. This parameter helps find datasets containing these *exact* values and should be used alongside `specific_queries` or `exploratory_topics` if relevant values are mentioned. -6. **Execute Tool Call**: Call `search_data_catalog` with the generated parameters. Ensure at least one parameter (`specific_queries`, `exploratory_topics`, `value_search_terms`) is non-null. +1. **Analyze Request & Context**: Review the latest user message and conversation history. +2. **Extract Specific Values**: Identify concrete values from the user request for value injection. +3. **Check for Visualization-Only Request**: If the request is *purely* about visual aspects (chart types, colors) and no new data analysis is needed -> Call `no_search_needed`. +4. **Assess Need for Fresh Data**: + * **If New Analysis Required**: Any request that involves data analysis, exploration, or requires fresh value injection -> Call `search_data_catalog`. + * **If No New Data Needed**: Simple clarifications or purely visual changes -> Call `no_search_needed`. +5. **Formulate Loading Parameters**: + * **Specific Requests** (e.g., "Top customer by revenue", "Sales for Product X"): Generate `specific_queries` describing what data is needed. + * **Exploratory Requests** (e.g., "Tell me about revenue", "Factors influencing churn"): Generate `exploratory_topics` for broader data loading. + * **Mixed Requests**: Generate *both* `specific_queries` and `exploratory_topics` as appropriate. + * **Value Search Terms**: Always include extracted specific values in `value_search_terms` for fresh injection. +6. **Execute Tool Call**: Call the appropriate tool with generated parameters. **Tool Parameters (`search_data_catalog`)** -- `specific_queries`: `Option>` - For focused requests. Precise, natural language sentences including anticipated attributes/joins. -- `exploratory_topics`: `Option>` - For vague/investigative requests. Concise phrases for discovery. -- `value_search_terms`: `Option>` - **CRITICAL**: For specific, meaningful values/entities mentioned in the request (Product names, locations, categories, statuses, etc., as defined in Step 3). Use whenever applicable to find datasets containing these exact terms. **Must exclude IDs, UUIDs, and non-semantic values** (see Step 3 exclusions). +- `specific_queries`: `Option>` - For focused requests. Natural language descriptions of needed data. +- `exploratory_topics`: `Option>` - For broad/investigative requests. Topics for data exploration. +- `value_search_terms`: `Option>` - **CRITICAL**: Specific values from the user request for fresh injection into datasets. + +**Important Notes:** +- **Data Loading Strategy**: `search_data_catalog` now loads ALL available datasets with fresh value injection rather than filtering specific ones. +- **Fresh Value Injection**: Always use `value_search_terms` when specific values are mentioned to get the most current data. +- **Previous Results**: Any previous search results are automatically truncated to keep conversations manageable. **Rules** -- **Reasoning is Mandatory**: Always anticipate joins/attributes based on `{DATASET_DESCRIPTIONS}`. -- **Value Extraction is Mandatory**: Always attempt to extract specific values from the user request for `value_search_terms`. -- **Use `value_search_terms` When Applicable**: If specific values are extracted, *always* include them in the `value_search_terms` parameter, even if also using `specific_queries` or `exploratory_topics`. +- **Value Extraction is Mandatory**: Always attempt to extract specific values from the user request. +- **Use `value_search_terms` When Applicable**: If specific values are extracted, *always* include them for fresh injection. - **Output = Tool Call**: Only output a single tool call. -- **Match Parameters to Request Type**: Use the appropriate combination of parameters based on the analysis. -- **Default to Search if No Context/Insufficient**: If context is lacking, always search. +- **Default to Loading for Analysis**: If the request involves any data analysis, load fresh data. -**Examples (Illustrating Parameter Combinations)** +**Examples** -- **Initial Request (Specific)**: User: "Who is my top customer by revenue?" - - *Reasoning*: Need Customer, Revenue Metric. Anticipate Customer Name/ID. No specific values mentioned. +- **Initial Request**: User: "Who is my top customer by revenue?" + - *Reasoning*: Need Customer and Revenue data analysis. No specific values mentioned. - Tool: `search_data_catalog` - - Params: `{"specific_queries": ["Find datasets identifying the top Customer by revenue, including Customer Name and Customer ID properties."]} ` -- **Follow-up (Exploratory + Value)**: User: "Tell me more about this customer Acme Corp [ID 123]." - - *Reasoning*: Context has basic data. User wants broader info. Specific value "Acme Corp" mentioned. + - Params: `{"specific_queries": ["Find datasets with customer revenue data to identify top customers."]}` + +- **Request with Values**: User: "What's the sales trend for Red Bull in California?" + - *Reasoning*: Need Sales data with specific product and location. Values "Red Bull" and "California" mentioned. - Tool: `search_data_catalog` - - Params: `{"exploratory_topics": ["Acme Corp interaction history", "Acme Corp product usage patterns", "Acme Corp support tickets"], "value_search_terms": ["Acme Corp"]}` -- **Specific Request with Values**: User: "What's the sales trend for Red Bull in California?" - - *Reasoning*: Need Sales Metric over time, filtered by Product="Red Bull", Region="California". Specific values mentioned. + - Params: `{"specific_queries": ["Find datasets showing sales trends for specific products in specific regions."], "value_search_terms": ["Red Bull", "California"]}` + +- **Exploratory Request**: User: "Tell me about our customer churn patterns." + - *Reasoning*: Broad analytical request requiring data exploration. - Tool: `search_data_catalog` - - Params: `{"specific_queries": ["Find datasets showing Sales trends over time for specific products in specific regions."], "value_search_terms": ["Red Bull", "California"]}` -- **Mixed Request with Multiple Values**: User: "Compare sales between Nike and Adidas in our Premium tier stores." - - *Reasoning*: Need Sales, Product Brand comparison, Store Tier="Premium" filter. Specific values "Nike", "Adidas", "Premium tier" mentioned. Mixed specific (comparison) and exploratory (brand/tier performance). - - Tool: `search_data_catalog` - - Params: `{"specific_queries": ["Find datasets linking Sales to Product Brand and Store Tier for comparison analysis."], "exploratory_topics": ["Brand comparison metrics", "Premium tier store performance"], "value_search_terms": ["Nike", "Adidas", "Premium tier"]}` -- **Sufficient Context**: User: "Plot the Q1 revenue for the top customer [Acme Corp] we just identified." - - *Reasoning*: Context has needed customer/revenue data. + - Params: `{"exploratory_topics": ["Customer churn patterns", "Customer retention metrics", "Customer lifecycle data"]}` + +- **Visualization Only**: User: "Make that chart blue instead of red." + - *Reasoning*: Pure visual change, no new data analysis needed. - Tool: `no_search_needed` - - Reason: "Existing dataset descriptions cover the request for Q1 revenue for the identified customer Acme Corp." - -**Validation** -- Ensure parameters reflect the request type and anticipated needs. -- Ensure `value_search_terms` is populated if specific values were mentioned. -- Ensure `no_search_needed` reason is accurate. + - Reason: "Request is for visual formatting changes only, no new data analysis required." **Available Dataset Names (for context)** {DATASETS} You are an agent - please keep going until the user's query is completely resolved, before ending your turn and yielding back to the user. Only terminate your turn when you are sure that the problem is solved. -If you are not sure about file content or codebase structure pertaining to the user's request, use your tools to read files and gather the relevant information: do NOT guess or make up an answer. +If you are not sure about file content or codebase structure pertaining to the user's request, use your tools to gather the relevant information: do NOT guess or make up an answer. You MUST plan extensively before each function call, and reflect extensively on the outcomes of the previous function calls. DO NOT do this entire process by making function calls only, as this can impair your ability to solve the problem and think insightfully. "##; diff --git a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs index 193256ca2..6863ccc7c 100644 --- a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs +++ b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs @@ -1,7 +1,6 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::{env, sync::Arc, time::Instant}; use database::enums::DataSourceType; -use tokio::sync::Mutex; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -20,7 +19,6 @@ use tracing::{debug, error, info, warn}; use uuid::Uuid; use dataset_security::{get_permissioned_datasets, PermissionedDataset}; use stored_values; -use rerank::Reranker; // Import SemanticLayerSpec use semantic_layer::models::Model; @@ -61,18 +59,6 @@ pub struct DatasetSearchResult { pub yml_content: Option, } -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)] -struct DatasetResult { - id: Uuid, - name: Option, - yml_content: Option, -} - -#[derive(Debug, Clone)] -struct RankedDataset { - dataset: PermissionedDataset, -} - /// Represents a searchable dimension in a model #[derive(Debug, Clone)] struct SearchableDimension { @@ -104,36 +90,6 @@ async fn generate_embedding_for_text(text: &str) -> Result> { Ok(embedding_response.data[0].embedding.clone()) } -// Rename and modify the function signature -async fn search_values_for_term_by_embedding( - data_source_id: &Uuid, - embedding: Vec, // Accept pre-computed embedding - limit: i64, -) -> Result> { - // Skip searching if embedding is invalid (e.g., empty) - if embedding.is_empty() { - debug!("Skipping search for empty embedding"); - return Ok(vec![]); - } - - // Search values using the provided embedding (no table/column filters) - match stored_values::search::search_values_by_embedding( - *data_source_id, - &embedding, - limit, - ).await { - Ok(results) => { - debug!(count = results.len(), "Successfully found values matching embedding"); - Ok(results) - } - Err(e) => { - error!(data_source_id = %data_source_id, error = %e, "Failed to search values by embedding"); - // Return empty results on error to continue the process - Ok(vec![]) - } - } -} - // Helper function to identify time-based terms that might cause issues fn is_time_period_term(term: &str) -> bool { let term_lower = term.to_lowercase(); @@ -143,7 +99,6 @@ fn is_time_period_term(term: &str) -> bool { "today", "yesterday", "tomorrow", "last week", "last month", "last year", "last quarter", "this week", "this month", "this year", "this quarter", - "next week", "next month", "next year", "next quarter", "q1", "q2", "q3", "q4", "january", "february", "march", "april", "may", "june", "july", "august", "september", "october", "november", "december", @@ -164,116 +119,6 @@ fn to_found_value_info(result: stored_values::search::StoredValueResult, _score: } } -#[derive(Debug, Deserialize)] -struct LLMFilterResponse { - results: Vec, -} - -const SPECIFIC_LLM_FILTER_PROMPT: &str = r#" -You are a dataset relevance evaluator, focused on specific analytical requirements. Your task is to determine which datasets are **semantically relevant** to the user's query and the anticipated analytical needs based on their structure and metadata. Focus on the core **Business Objects, Properties, Events, Metrics, and Filters** explicitly requested or strongly implied. - -USER REQUEST (Context): {user_request} -SPECIFIC SEARCH QUERY: {query} (This query is framed around key semantic concepts and anticipated attributes/joins identified from the user request) - -Below is a list of datasets that were identified as potentially relevant by an initial ranking system. -For each dataset, review its description in the YAML format. Evaluate how well the dataset's described contents (columns, metrics, entities, documentation) **semantically align** with the key **Objects, Properties, Events, Metrics, and Filters** required by the SPECIFIC SEARCH QUERY and USER REQUEST context. - -IMPORTANT EVIDENCE - ACTUAL DATA VALUES FOUND IN THIS DATASET: -{found_values_json} -These values were found in the actual data that matches your search requirements. Consider these as concrete evidence that this dataset contains data relevant to your query. - -**Crucially, anticipate necessary attributes**: Pay close attention to whether the dataset contains specific attributes like **names, IDs, emails, timestamps, or other identifying/linking information** that are likely required to fulfill the analytical goal, even if not explicitly stated in the query but inferable from the user request context and common analytical patterns (e.g., needing 'customer name' when analyzing 'customer revenue'). - -Include datasets where the YAML description suggests a reasonable semantic match or overlap with the needed concepts and anticipated attributes. Prioritize datasets that appear to contain the core Objects or Events AND the necessary linking/descriptive Properties. - -DATASETS: -{datasets_json} - -Return a JSON response containing ONLY a list of the UUIDs for the semantically relevant datasets. The response should have the following structure: -```json -{ - "results": [ - "dataset-uuid-here-1", - "dataset-uuid-here-2" - // ... semantically relevant dataset UUIDs - ] -} -``` - -IMPORTANT GUIDELINES: -1. **Focus on Semantic Relevance & Anticipation**: Include datasets whose content, as described in the YAML, is semantically related to the required Objects, Properties, Events, Metrics, or Filters, AND contains the anticipated attributes needed for analysis (like names, IDs, relevant dimensions). -2. **Consider the Core Concepts & Analytical Goal**: Does the dataset seem to be about the primary Business Object(s) or Event(s)? Does it contain relevant Properties or Metrics (including anticipated ones)? -3. **Prioritize Datasets with Key Attributes**: Give higher importance to datasets containing necessary identifying or descriptive attributes (names, IDs, categories, timestamps) relevant to the query and user request context. -4. **Evaluate based on Semantic Fit**: Does the dataset's purpose and structure align well with the user's information need and the likely analytical steps? -5. **Consider Found Values as Evidence**: The actual values found in the dataset provide concrete evidence of relevance. If values matching the user's query (like specific entities, terms, or categories) appear in the dataset, this strongly suggests relevance. -6. **Contextual Information is Relevant**: Include datasets providing important contextual Properties for the core Objects or Events. -7. **When in doubt, lean towards inclusion if semantically plausible and potentially useful**: If the dataset seems semantically related, include it. -8. **CRITICAL:** Each string in the "results" array MUST contain ONLY the dataset's UUID string (e.g., "9711ca55-8329-4fd9-8b20-b6a3289f3d38"). -9. **Use USER REQUEST for context, SPECIFIC SEARCH QUERY for focus**: Understand the underlying need (user request) and the specific concepts/attributes being targeted (search query). -"#; - -const EXPLORATORY_LLM_FILTER_PROMPT: &str = r#" -You are a dataset relevance evaluator, focused on exploring potential connections and related concepts. Your task is to determine which datasets might be **thematically relevant** or provide useful **contextual information** related to the user's exploratory topic and broader request. - -USER REQUEST (Context): {user_request} -EXPLORATORY TOPIC: {topic} (This topic represents a general area of interest derived from the user request) - -Below is a list of datasets identified as potentially relevant by an initial ranking system. -For each dataset, review its description in the YAML format. Evaluate how well the dataset's described contents (columns, metrics, entities, documentation) **thematically relate** to the EXPLORATORY TOPIC and the overall USER REQUEST context. - -IMPORTANT EVIDENCE - ACTUAL DATA VALUES FOUND IN THIS DATASET: -{found_values_json} -These values were found in the actual data that matches your exploratory topics. Consider these as concrete evidence that this dataset contains data relevant to your exploration. - -Consider datasets that: -- Directly address the EXPLORATORY TOPIC. -- Contain concepts, objects, or events that are often related to the EXPLORATORY TOPIC (e.g., if the topic is 'customer churn', related datasets might involve 'customer support interactions', 'product usage', 'marketing engagement', 'customer demographics'). -- Provide valuable contextual dimensions (like time, geography, product categories) that could enrich the analysis of the EXPLORATORY TOPIC. -- Might reveal interesting patterns or correlations when combined with data more central to the topic. - -Focus on **potential utility for exploration and discovery**, rather than strict semantic matching to the topic words alone. - -DATASETS: -{datasets_json} - -Return a JSON response containing ONLY a list of the UUIDs for the potentially relevant datasets for exploration. The response should have the following structure: -```json -{ - "results": [ - "dataset-uuid-here-1", - "dataset-uuid-here-2" - // ... potentially relevant dataset UUIDs for exploration - ] -} -``` - -IMPORTANT GUIDELINES: -1. **Focus on Thematic Relevance & Potential Utility**: Include datasets whose content seems related to the EXPLORATORY TOPIC or could provide valuable context/insights for exploration. -2. **Consider Related Concepts**: Think broadly about what data is often analyzed alongside the given topic. -3. **Consider Found Values as Evidence**: The actual values found in the dataset provide concrete evidence of relevance. If values matching the user's exploratory topic (like specific entities, terms, or categories) appear in the dataset, this strongly suggests usefulness for exploration. -4. **Prioritize Breadth**: Lean towards including datasets that might offer different perspectives or dimensions related to the topic. -5. **Evaluate based on Potential for Discovery**: Does the dataset seem like it could contribute to understanding the topic area, even indirectly? -6. **Contextual Information is Valuable**: Include datasets providing relevant dimensions or related entities. -7. **When in doubt, lean towards inclusion if thematically plausible**: If the dataset seems potentially related to the exploration goal, include it. -8. **CRITICAL:** Each string in the "results" array MUST contain ONLY the dataset's UUID string (e.g., "9711ca55-8329-4fd9-8b20-b6a3289f3d38"). -9. **Use USER REQUEST for context, EXPLORATORY TOPIC for focus**: Understand the underlying need (user request) and the general area being explored (topic). -"#; - -// NEW: Helper function to extract data source ID from permissioned datasets -// This is a placeholder - you'll need to adjust based on how data_source_id is actually stored/retrieved -fn extract_data_source_id(datasets: &[PermissionedDataset]) -> Option { - // Assuming datasets have a data_source_id property or it can be derived from dataset.id - // As a fallback, we're using the ID of the first dataset - // Replace this with actual implementation based on your data model - if datasets.is_empty() { - return None; - } - - // For this implementation, we're assuming the dataset ID is the data source ID - // In a real implementation, you would likely have a different way to get the data_source_id - Some(datasets[0].data_source_id) -} - pub struct SearchDataCatalogTool { agent: Arc, } @@ -312,6 +157,12 @@ impl SearchDataCatalogTool { } } } + + // NEW: Helper function to truncate previous search_data_catalog results + async fn truncate_previous_search_results(&self) -> Result<()> { + // Truncate previous search_data_catalog results to keep conversation manageable + self.agent.truncate_previous_tool_results("search_data_catalog", "[REMOVED BC OF SIZE]").await + } } #[async_trait] @@ -322,7 +173,11 @@ impl ToolExecutor for SearchDataCatalogTool { async fn execute(&self, params: Self::Params, _tool_call_id: String) -> Result { let start_time = Instant::now(); let user_id = self.agent.get_user_id(); - let session_id = self.agent.get_session_id(); + + // Truncate previous search results to keep conversation manageable + if let Err(e) = self.truncate_previous_search_results().await { + warn!("Failed to truncate previous search results: {}", e); + } let specific_queries = params.specific_queries.clone().unwrap_or_default(); let exploratory_topics = params.exploratory_topics.clone().unwrap_or_default(); @@ -340,37 +195,15 @@ impl ToolExecutor for SearchDataCatalogTool { debug!( specific_queries_count = specific_queries.len(), exploratory_topics_count = exploratory_topics.len(), - "Starting request with specific queries and exploratory topics" + "Starting simplified search to return all datasets with value injection" ); - // Start concurrent tasks - - // 2. Begin fetching datasets concurrently - let user_id_for_datasets = user_id.clone(); - let all_datasets_future = tokio::spawn(async move { - Self::get_datasets(&user_id_for_datasets).await - }); - - // Await the datasets future first (we need this to proceed) - let all_datasets = match all_datasets_future.await? { - Ok(datasets) => datasets, - Err(e) => { - error!(user_id=%user_id, "Failed to retrieve permissioned datasets for tool execution: {}", e); - return Ok(SearchDataCatalogOutput { - message: format!("Error fetching datasets: {}", e), - specific_queries: params.specific_queries, - exploratory_topics: params.exploratory_topics, - duration: start_time.elapsed().as_millis() as i64, - results: vec![], - data_source_id: None, - }); - } - }; + // Get all datasets + let all_datasets = Self::get_datasets(&user_id).await?; // Check if datasets were fetched and are not empty if all_datasets.is_empty() { info!("No datasets found for the organization or user."); - // Optionally cache that no data source was found or handle as needed self.agent.set_state_value(String::from("data_source_id"), Value::Null).await; return Ok(SearchDataCatalogOutput { message: "No datasets available to search. Have you deployed datasets? If you believe this is an error, please contact support.".to_string(), @@ -383,7 +216,6 @@ impl ToolExecutor for SearchDataCatalogTool { } // Extract and cache the data_source_id from the first dataset - // Assumes all datasets belong to the same data source for this user context let target_data_source_id = all_datasets[0].data_source_id; debug!(data_source_id = %target_data_source_id, "Extracted data source ID"); @@ -392,10 +224,9 @@ impl ToolExecutor for SearchDataCatalogTool { "data_source_id".to_string(), Value::String(target_data_source_id.to_string()) ).await; - debug!(data_source_id = %target_data_source_id, "Cached data source ID in agent state"); - // --- BEGIN: Spawn concurrent task to fetch data source syntax --- - let agent_clone = self.agent.clone(); // Clone Arc for the async block + // Spawn concurrent task to fetch data source syntax + let agent_clone = self.agent.clone(); let syntax_future = tokio::spawn(async move { let result: Result = async { let mut conn = get_pg_pool().get().await @@ -403,17 +234,15 @@ impl ToolExecutor for SearchDataCatalogTool { let source_type = data_sources::table .filter(data_sources::id.eq(target_data_source_id)) - .select(data_sources::type_) // <-- Use type_ as per user edit - .first::(&mut conn) // <-- Use corrected enum name + .select(data_sources::type_) + .first::(&mut conn) .await .context(format!("Failed to find data source type for ID: {}", target_data_source_id))?; - // Use the enum's to_string() method directly let syntax_string = source_type.to_string(); Ok(syntax_string) }.await; - // Set state inside the spawned task match result { Ok(syntax) => { debug!(data_source_id = %target_data_source_id, syntax = %syntax, "Determined data source syntax concurrently"); @@ -431,10 +260,9 @@ impl ToolExecutor for SearchDataCatalogTool { } } }); - // --- END: Spawn concurrent task to fetch data source syntax --- - - // --- BEGIN REORDERED VALUE SEARCH --- + // --- VALUE SEARCH (keep the embedding-based search and injection) --- + // Extract value search terms let value_search_terms = params.value_search_terms.clone().unwrap_or_default(); @@ -451,33 +279,28 @@ impl ToolExecutor for SearchDataCatalogTool { generate_embeddings_batch(embedding_terms).await }); - // Await the batch embedding generation match embedding_batch_future.await? { Ok(results) => results.into_iter().collect(), Err(e) => { error!(error = %e, "Batch embedding generation failed"); - HashMap::new() // Return empty map on error + HashMap::new() } } } else { - HashMap::new() // No valid terms, no embeddings needed + HashMap::new() }; debug!(count = term_embeddings.len(), "Generated embeddings for value search terms via batch"); - // Begin value searches concurrently using pre-generated embeddings and schema filter + // Begin value searches concurrently using pre-generated embeddings let mut value_search_futures = Vec::new(); if !term_embeddings.is_empty() { - let schema_name = format!("ds_{}", target_data_source_id.to_string().replace('-', "_")); - debug!(schema_filter = %schema_name, "Using schema filter for value search"); - for (term, embedding) in term_embeddings.iter() { let term_clone = term.clone(); let embedding_clone = embedding.clone(); let data_source_id_clone = target_data_source_id; let future = tokio::spawn(async move { - // Use search_values_by_embedding_with_filters with only the schema filter let results = stored_values::search::search_values_by_embedding( data_source_id_clone, &embedding_clone, @@ -496,7 +319,7 @@ impl ToolExecutor for SearchDataCatalogTool { futures::future::join_all(value_search_futures) .await .into_iter() - .filter_map(|r| r.ok()) // Filter out any join errors + .filter_map(|r| r.ok()) .collect(); // Process the value search results @@ -506,236 +329,52 @@ impl ToolExecutor for SearchDataCatalogTool { Ok(values) => { let found_values: Vec = values.into_iter() .map(|val| { - to_found_value_info(val, 0.0) // We don't use score in FoundValueInfo + to_found_value_info(val, 0.0) }) .collect(); - let term_str = term.clone(); // Clone before moving into HashMap + let term_str = term.clone(); let values_count = found_values.len(); found_values_by_term.insert(term, found_values); - debug!(term = %term_str, count = values_count, schema = %format!("ds_{}", target_data_source_id.to_string().replace('-', "_")), "Found values for search term"); + debug!(term = %term_str, count = values_count, "Found values for search term"); } Err(e) => { error!(term = %term, error = %e, "Error searching for values"); - // Store empty vec even on error to avoid issues later found_values_by_term.insert(term, vec![]); } } } - // Flatten all found values into a single list (needed for LLM filter) + // Flatten all found values into a single list let all_found_values: Vec = found_values_by_term.values() .flat_map(|values| values.clone()) .collect(); - debug!(value_count = all_found_values.len(), "Total found values across all terms after initial search"); + debug!(value_count = all_found_values.len(), "Total found values across all terms after search"); - // --- END REORDERED VALUE SEARCH --- - - // Check if we have anything to search for *after* value search and before reranking - if specific_queries.is_empty() && exploratory_topics.is_empty() && all_found_values.is_empty() && valid_value_search_terms.is_empty() { - // Adjusted condition to check all_found_values as well - warn!("SearchDataCatalogTool executed with no specific queries, exploratory topics, or valid value search terms resulting in found values."); - // We might still want to return an empty list if no queries/topics provided, even if values were searched but none found. - // Let's return the empty list if no queries/topics AND no values found from terms. - if specific_queries.is_empty() && exploratory_topics.is_empty() && all_found_values.is_empty() { - return Ok(SearchDataCatalogOutput { - message: "No search queries, exploratory topics, or found values from provided terms.".to_string(), - specific_queries: params.specific_queries, - exploratory_topics: params.exploratory_topics, - duration: start_time.elapsed().as_millis() as i64, - results: vec![], - data_source_id: Some(target_data_source_id), - }); - } - } - - // Prepare documents from datasets (needed for reranking) - let documents: Vec = all_datasets - .iter() - .filter_map(|dataset| dataset.yml_content.clone()) - .collect(); - - if documents.is_empty() { - warn!("No datasets with YML content found after filtering."); - return Ok(SearchDataCatalogOutput { - message: "No searchable dataset content found.".to_string(), - specific_queries: params.specific_queries, - exploratory_topics: params.exploratory_topics, - duration: start_time.elapsed().as_millis() as i64, - results: vec![], - data_source_id: Some(target_data_source_id), - }); - } - - // --- BEGIN MOVED RERANKING --- - // We'll use the user prompt for the LLM filtering - let user_prompt_for_task = user_prompt_str.clone(); + // --- RETURN ALL DATASETS (no filtering/ranking) --- - // Keep track of reranking errors using Arc - let rerank_errors = Arc::new(Mutex::new(Vec::new())); - - // Start specific query reranking - let specific_rerank_futures = stream::iter(specific_queries.clone()) - .map(|query| { - let current_query = query.clone(); - let datasets_clone = all_datasets.clone(); - let documents_clone = documents.clone(); - let rerank_errors_clone = Arc::clone(&rerank_errors); // Clone Arc - - async move { - let ranked = match rerank_datasets(¤t_query, &datasets_clone, &documents_clone).await { - Ok(r) => r, - Err(e) => { - error!(error = %e, query = current_query, "Reranking failed for specific query"); - // Lock and push error - let mut errors = rerank_errors_clone.lock().await; - errors.push(format!("Failed to rerank for specific query '{}': {}", current_query, e)); - Vec::new() // Return empty vec on error to avoid breaking flow - } - }; - - (current_query, ranked) - } - }) - .buffer_unordered(10); - - // Start exploratory topic reranking - let exploratory_rerank_futures = stream::iter(exploratory_topics.clone()) - .map(|topic| { - let current_topic = topic.clone(); - let datasets_clone = all_datasets.clone(); - let documents_clone = documents.clone(); - let rerank_errors_clone = Arc::clone(&rerank_errors); // Clone Arc - - async move { - let ranked = match rerank_datasets(¤t_topic, &datasets_clone, &documents_clone).await { - Ok(r) => r, - Err(e) => { - error!(error = %e, topic = current_topic, "Reranking failed for exploratory topic"); - // Lock and push error - let mut errors = rerank_errors_clone.lock().await; - errors.push(format!("Failed to rerank for exploratory topic '{}': {}", current_topic, e)); - Vec::new() // Return empty vec on error to avoid breaking flow - } - }; - - (current_topic, ranked) - } - }) - .buffer_unordered(10); - - // Collect rerank results in parallel - let specific_reranked_vec = specific_rerank_futures.collect::)>>().await; - let exploratory_reranked_vec = exploratory_rerank_futures.collect::)>>().await; - // --- END MOVED RERANKING --- - - // 6. Now run LLM filtering with the found values and ranked datasets - let specific_filter_futures = stream::iter(specific_reranked_vec) - .map(|(query, ranked)| { - let user_id_clone = user_id.clone(); - let session_id_clone = session_id.clone(); - let prompt_clone = user_prompt_for_task.clone(); - let values_clone = all_found_values.clone(); - - async move { - if ranked.is_empty() { - return Ok(vec![]); - } - - match filter_specific_datasets_with_llm(&query, &prompt_clone, ranked, &user_id_clone, &session_id_clone, &values_clone).await { - Ok(filtered) => Ok(filtered), - Err(e) => { - error!(error = %e, query = query, "LLM filtering failed for specific query"); - Ok(vec![]) - } - } - } - }) - .buffer_unordered(10); - - let exploratory_filter_futures = stream::iter(exploratory_reranked_vec) - .map(|(topic, ranked)| { - let user_id_clone = user_id.clone(); - let session_id_clone = session_id.clone(); - let prompt_clone = user_prompt_for_task.clone(); - let values_clone = all_found_values.clone(); - - async move { - if ranked.is_empty() { - return Ok(vec![]); - } - - match filter_exploratory_datasets_with_llm(&topic, &prompt_clone, ranked, &user_id_clone, &session_id_clone, &values_clone).await { - Ok(filtered) => Ok(filtered), - Err(e) => { - error!(error = %e, topic = topic, "LLM filtering failed for exploratory topic"); - Ok(vec![]) - } - } - } - }) - .buffer_unordered(10); - - // Collect filter results - let specific_results_vec: Vec>> = specific_filter_futures.collect().await; - let exploratory_results_vec: Vec>> = exploratory_filter_futures.collect().await; - - // Process and combine results - let mut combined_results = Vec::new(); - let mut unique_ids = HashSet::new(); - - for result in specific_results_vec { - match result { - Ok(datasets) => { - for dataset in datasets { - if unique_ids.insert(dataset.id) { - combined_results.push(dataset); - } - } - } - Err(e) => { - warn!("Error processing a specific query stream: {}", e); - } - } - } - - for result in exploratory_results_vec { - match result { - Ok(datasets) => { - for dataset in datasets { - if unique_ids.insert(dataset.id) { - combined_results.push(dataset); - } - } - } - Err(e) => { - warn!("Error processing an exploratory topic stream: {}", e); - } - } - } - - let final_search_results: Vec = combined_results + // Convert all datasets to search results + let all_search_results: Vec = all_datasets .into_iter() - .map(|result| DatasetSearchResult { - id: result.id, - name: result.name, - yml_content: result.yml_content, + .map(|dataset| DatasetSearchResult { + id: dataset.id, + name: Some(dataset.name), + yml_content: dataset.yml_content, }) .collect(); - // After filtering and before returning results, update YML content with search results - // For each dataset in the final results, search for searchable dimensions and update YML + // Update YML content with search results (keep the value injection) let mut updated_results = Vec::new(); - for result in &final_search_results { + for result in &all_search_results { let mut updated_result = result.clone(); if let Some(yml_content) = &result.yml_content { // Inject pre-found values into YML match inject_prefound_values_into_yml( yml_content, - &all_found_values, // Pass the results from the initial value search + &all_found_values, ).await { Ok(updated_yml) => { debug!( @@ -757,38 +396,12 @@ impl ToolExecutor for SearchDataCatalogTool { updated_results.push(updated_result); } - // --- BEGIN: Wait for syntax future --- - // Ensure the syntax task completes before finishing. + // Wait for syntax future to complete if let Err(e) = syntax_future.await { - // Handle potential join errors (e.g., if the spawned task panicked) warn!(error = %e, "Syntax fetching task failed to join"); - // Depending on requirements, you might want to return an error here - // or ensure the state is explicitly null if it didn't get set. - // For now, we'll just log the warning, as the task itself handles - // setting state to null on internal errors. } - // --- END: Wait for syntax future --- - - // Return the updated results - let mut message = if updated_results.is_empty() { - "No relevant datasets found after filtering.".to_string() - } else { - format!("Found {} relevant datasets with injected values for searchable dimensions.", updated_results.len()) - }; - - // Append reranking error information if any occurred - // Lock the mutex to access the errors safely - let final_errors = rerank_errors.lock().await; - if !final_errors.is_empty() { - message.push_str(" - Warning: Some parts of the search failed due to reranking errors:"); - for error_msg in final_errors.iter() { // Iterate over locked data - message.push_str(&format!(" - - {}", error_msg)); - } - } - // Mutex guard `final_errors` is dropped here + // Set state flags self.agent .set_state_value( String::from("data_context"), @@ -802,12 +415,18 @@ impl ToolExecutor for SearchDataCatalogTool { let duration = start_time.elapsed().as_millis(); + let message = if updated_results.is_empty() { + "No datasets found.".to_string() + } else { + format!("Loaded {} datasets with injected values for searchable dimensions.", updated_results.len()) + }; + Ok(SearchDataCatalogOutput { message, specific_queries: params.specific_queries, exploratory_topics: params.exploratory_topics, duration: duration as i64, - results: updated_results, // Use updated results instead of final_search_results + results: updated_results, data_source_id: Some(target_data_source_id), }) } @@ -856,7 +475,7 @@ impl ToolExecutor for SearchDataCatalogTool { async fn get_search_data_catalog_description() -> String { if env::var("USE_BRAINTRUST_PROMPTS").is_err() { - return "Searches the data catalog for relevant data assets (e.g., datasets, models, metrics, filters, properties, documentation) based on high-intent queries derived solely from the user's request and conversation history, with no assumptions about data availability. Queries are concise, full-sentence, natural language expressions of search intent. Specific requests generate a single, focused query, while broad requests produce multiple queries to cover all context-implied assets (datasets, models, metrics, properties, documentation), starting with topics mentioned in the context (e.g., sales, customers, products) and refining with filters, metrics, or relationships. Supports multiple concurrent queries for comprehensive coverage.".to_string(); + return "Loads all available datasets with fresh value injection for searchable dimensions based on the provided search terms. Returns comprehensive dataset information with injected relevant values to assist with analysis and planning.".to_string(); } let client = BraintrustClient::new(None, "96af8b2b-cf3c-494f-9092-44eb3d5b96ff").unwrap(); @@ -867,263 +486,11 @@ async fn get_search_data_catalog_description() -> String { "Failed to get prompt system message for tool description: {}", e ); - "Searches the data catalog for relevant data assets (e.g., datasets, models, metrics, filters, properties, documentation) based on high-intent queries derived solely from the user's request and conversation history, with no assumptions about data availability. Queries are concise, full-sentence, natural language expressions of search intent. Specific requests generate a single, focused query, while broad requests produce multiple queries to cover all context-implied assets (datasets, models, metrics, properties, documentation), starting with topics mentioned in the context (e.g., sales, customers, products) and refining with filters, metrics, or relationships. Supports multiple concurrent queries for comprehensive coverage.".to_string() + "Loads all available datasets with fresh value injection for searchable dimensions based on the provided search terms. Returns comprehensive dataset information with injected relevant values to assist with analysis and planning. Previous search results are automatically truncated to keep conversations manageable.".to_string() } } } -async fn rerank_datasets( - query: &str, - all_datasets: &[PermissionedDataset], - documents: &[String], -) -> Result, anyhow::Error> { - if documents.is_empty() || all_datasets.is_empty() { - return Ok(vec![]); - } - - // Initialize your custom reranker - let reranker = Reranker::new() - .map_err(|e| anyhow::anyhow!("Failed to initialize custom Reranker: {}", e))?; - - // Convert documents from Vec to Vec<&str> for the rerank library - let doc_slices: Vec<&str> = documents.iter().map(AsRef::as_ref).collect(); - - // Define top_n, e.g., 35 as used with Cohere - let top_n = 35; - - // Call your custom reranker's rerank method - let rerank_results = match reranker.rerank(query, &doc_slices, top_n).await { - Ok(results) => results, - Err(e) => { - error!(error = %e, query = query, "Custom reranker API call failed"); - return Err(anyhow::anyhow!("Custom reranker failed: {}", e)); - } - }; - - let mut ranked_datasets = Vec::new(); - // The structure of RerankResult from your library (index, relevance_score) - // is compatible with the existing loop logic. - for result in rerank_results { - if let Some(dataset) = all_datasets.get(result.index as usize) { - ranked_datasets.push(RankedDataset { - dataset: dataset.clone(), - }); - } else { - error!( - "Invalid dataset index {} from custom reranker for query '{}'. Max index: {}", - result.index, - query, - all_datasets.len().saturating_sub(1) // Avoid panic on empty all_datasets (though guarded above) - ); - } - } - - // The original code collected into Vec<_> then returned. This is fine. - // let relevant_datasets = ranked_datasets.into_iter().collect::>(); - // Ok(relevant_datasets) - // Simpler: - Ok(ranked_datasets) -} - -async fn llm_filter_helper( - prompt_template: &str, - query_or_topic: &str, - user_prompt: &str, - ranked_datasets: Vec, - user_id: &Uuid, - session_id: &Uuid, - generation_name_suffix: &str, - all_found_values: &[FoundValueInfo], -) -> Result, anyhow::Error> { - if ranked_datasets.is_empty() { - return Ok(vec![]); - } - - let datasets_json = ranked_datasets - .iter() - .map(|ranked| { - serde_json::json!({ - "id": ranked.dataset.id.to_string(), - "name": ranked.dataset.name, - "yml_content": ranked.dataset.yml_content.clone().unwrap_or_default(), - }) - }) - .collect::>(); - - // NEW: Format found values as JSON for the prompt - let found_values_json = if all_found_values.is_empty() { - "No specific values were found in the dataset that match the search terms.".to_string() - } else { - // Convert found values to a formatted string that can be inserted in the prompt - let values_json = all_found_values - .iter() - .map(|val| { - format!( - "- '{}' (found in {}.{}.{})", - val.value, val.database_name, val.table_name, val.column_name - ) - }) - .collect::>() - .join("\n"); - values_json - }; - - let prompt = prompt_template - .replace("{user_request}", user_prompt) - .replace("{query}", query_or_topic) - .replace("{topic}", query_or_topic) - .replace( - "{datasets_json}", - &serde_json::to_string_pretty(&datasets_json)?, - ) - .replace("{found_values_json}", &found_values_json); - - let llm_client = LiteLLMClient::new(None, None); - - let model = if env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()) == "local" { - "gpt-4.1-nano".to_string() - } else { - "gemini-2.0-flash-001".to_string() - }; - - let request = ChatCompletionRequest { - model, - messages: vec![AgentMessage::User { - id: None, - content: prompt, - name: None, - }], - stream: Some(false), - response_format: Some(ResponseFormat { - type_: "json_object".to_string(), - json_schema: None, - }), - store: Some(true), - metadata: Some(Metadata { - generation_name: format!("filter_data_catalog_{}_agent", generation_name_suffix), - user_id: user_id.to_string(), - session_id: session_id.to_string(), - trace_id: Uuid::new_v4().to_string(), - }), - max_completion_tokens: Some(8096), - temperature: Some(0.0), - ..Default::default() - }; - - let response = llm_client.chat_completion(request).await?; - - let content = match response.choices.get(0).map(|c| &c.message) { - Some(AgentMessage::Assistant { content: Some(content), .. }) => content, - _ => { - error!("LLM filter response missing or invalid content for query/topic: {}", query_or_topic); - return Err(anyhow::anyhow!("LLM filter response missing or invalid content")); - } - }; - - let filter_response: LLMFilterResponse = match serde_json::from_str(content) { - Ok(response) => response, - Err(e) => { - error!( - "Failed to parse LLM filter response for query/topic '{}': {}. Content: {}", - query_or_topic, e, content - ); - return Err(anyhow::anyhow!( - "Failed to parse LLM filter response: {}", - e - )); - } - }; - - let dataset_map: HashMap = ranked_datasets - .iter() - .map(|ranked| (ranked.dataset.id, &ranked.dataset)) - .collect(); - - let filtered_datasets: Vec = filter_response - .results - .into_iter() - .filter_map(|dataset_id_str| { - match Uuid::parse_str(&dataset_id_str) { - Ok(parsed_id) => { - if let Some(dataset) = dataset_map.get(&parsed_id) { - debug!(dataset_id = %dataset.id, dataset_name = %dataset.name, "Found matching dataset via LLM filter for query/topic: {}", query_or_topic); - Some(DatasetResult { - id: dataset.id, - name: Some(dataset.name.clone()), - yml_content: dataset.yml_content.clone(), - }) - } else { - warn!(parsed_id = %parsed_id, query_or_topic = query_or_topic, "LLM filter returned UUID not found in ranked list"); - None - } - } - Err(e) => { - error!(llm_result_id_str = %dataset_id_str, error = %e, query_or_topic = query_or_topic, "Failed to parse UUID from LLM filter result string"); - None - } - } - }) - .collect(); - - debug!( - "LLM filtering ({}) complete for query/topic '{}', keeping {} relevant datasets", - generation_name_suffix, - query_or_topic, - filtered_datasets.len() - ); - Ok(filtered_datasets) -} - -async fn filter_specific_datasets_with_llm( - query: &str, - user_prompt: &str, - ranked_datasets: Vec, - user_id: &Uuid, - session_id: &Uuid, - all_found_values: &[FoundValueInfo], -) -> Result, anyhow::Error> { - debug!( - "Filtering {} datasets with SPECIFIC LLM for query: {}", - ranked_datasets.len(), - query - ); - llm_filter_helper( - SPECIFIC_LLM_FILTER_PROMPT, - query, - user_prompt, - ranked_datasets, - user_id, - session_id, - "specific", - all_found_values - ).await -} - -async fn filter_exploratory_datasets_with_llm( - topic: &str, - user_prompt: &str, - ranked_datasets: Vec, - user_id: &Uuid, - session_id: &Uuid, - all_found_values: &[FoundValueInfo], -) -> Result, anyhow::Error> { - debug!( - "Filtering {} datasets with EXPLORATORY LLM for topic: {}", - ranked_datasets.len(), - topic - ); - llm_filter_helper( - EXPLORATORY_LLM_FILTER_PROMPT, - topic, - user_prompt, - ranked_datasets, - user_id, - session_id, - "exploratory", - all_found_values - ).await -} - // NEW: Helper function to generate embeddings for multiple texts in a batch async fn generate_embeddings_batch(texts: Vec) -> Result)>> { if texts.is_empty() { @@ -1134,7 +501,7 @@ async fn generate_embeddings_batch(texts: Vec) -> Result) -> Result) -> Result Vec { let mut dimensions = Vec::new(); if let Some(dims_val) = model_val.get("dimensions").and_then(|d| d.as_sequence()) { @@ -1185,7 +551,6 @@ fn process_model_value_for_searchable_dimensions( dimensions.push(SearchableDimension { model_name: model_name_from_val.to_string(), dimension_name: dimension_name.clone(), - // Path might need more context if used, but kept simple for now. dimension_path: vec!["models".to_string(), model_name_from_val.to_string(), "dimensions".to_string(), dimension_name], }); } @@ -1213,8 +578,6 @@ fn extract_searchable_dimensions(yml_content: &str) -> Result { - // Failed to parse as a single new-spec Model at the root. - // Try parsing as generic serde_yaml::Value, which might contain a list or be an old flat model. debug!( "Failed to parse yml_content directly as Model (error: {}), trying generic serde_yaml::Value for extract_searchable_dimensions. YML might be a list or old format.", e_spec_root @@ -1223,10 +586,8 @@ fn extract_searchable_dimensions(yml_content: &str) -> Result(model_item_val.clone()) { for dimension in model_in_list.dimensions { if dimension.searchable { @@ -1238,7 +599,6 @@ fn extract_searchable_dimensions(yml_content: &str) -> Result Result Result>>> ) { let table_name = model_name_from_val.to_string(); @@ -1276,13 +634,13 @@ fn process_model_value_for_database_info( let database_name = model_val .get("database") .and_then(|v| v.as_str()) - .unwrap_or("unknown_db") // Default if 'database' key is missing or not a string + .unwrap_or("unknown_db") .to_string(); let schema_name = model_val .get("schema") .and_then(|v| v.as_str()) - .unwrap_or("unknown_schema") // Default if 'schema' key is missing or not a string + .unwrap_or("unknown_schema") .to_string(); let mut columns = Vec::new(); @@ -1290,9 +648,8 @@ fn process_model_value_for_database_info( for dim in dimensions { if let Some(dim_name) = dim.get("name").and_then(|n| n.as_str()) { columns.push(dim_name.to_string()); - // In old style, 'expr' might also represent a column or be the column itself if let Some(expr) = dim.get("expr").and_then(|e| e.as_str()) { - if expr != dim_name { // Avoid duplicates if name and expr are same + if expr != dim_name { columns.push(expr.to_string()); } } @@ -1313,8 +670,6 @@ fn process_model_value_for_database_info( } if let Some(metrics) = model_val.get("metrics").and_then(|m| m.as_sequence()) { for metric in metrics { - // Metrics in old style usually just have a name that might correspond to a concept, - // but their 'expr' is complex. We'll take 'name' as a potential reference. if let Some(metric_name) = metric.get("name").and_then(|n| n.as_str()) { columns.push(metric_name.to_string()); } @@ -1341,7 +696,6 @@ fn extract_database_info_from_yaml(yml_content: &str) -> Result Result { - // Failed to parse as a single new-spec Model at the root. debug!( "Failed to parse yml_content directly as Model (error: {}), trying generic serde_yaml::Value for extract_database_info_from_yaml. YML might be a list or old format.", e_spec_root @@ -1363,10 +716,8 @@ fn extract_database_info_from_yaml(yml_content: &str) -> Result(model_item_val.clone()) { let db_name = model_in_list.database.as_deref().unwrap_or("unknown_db").to_string(); let sch_name = model_in_list.schema.as_deref().unwrap_or("unknown_schema").to_string(); @@ -1377,7 +728,6 @@ fn extract_database_info_from_yaml(yml_content: &str) -> Result Result Result>>>, - // Comprehensive searchable_dimensions for all models in the original YML searchable_dimensions: &[SearchableDimension], all_found_values: &[FoundValueInfo], ) { // Find the database and schema for this specific current_model_name using the comprehensive database_info - let mut model_db_details: Option<(&str, &str)> = None; // (database_name, schema_name) + let mut model_db_details: Option<(&str, &str)> = None; for (db_name_key, schemas) in database_info { for (schema_name_key, tables) in schemas { @@ -1439,13 +785,12 @@ fn inject_values_into_single_model_yaml( let dim_name_opt = dim_yaml_value.get("name").and_then(|n| n.as_str()); if let Some(dim_name_str) = dim_name_opt { - // Check if this dimension (dim_name_str) within this model (current_model_name) is searchable let is_searchable = searchable_dimensions.iter().any(|sd| { sd.model_name == current_model_name && sd.dimension_name == dim_name_str }); if !is_searchable { - continue; // Only inject into searchable dimensions + continue; } let relevant_values_for_dim: Vec = all_found_values @@ -1453,13 +798,13 @@ fn inject_values_into_single_model_yaml( .filter(|found_val| { found_val.database_name.eq_ignore_ascii_case(model_database_name) && found_val.schema_name.eq_ignore_ascii_case(model_schema_name) - && found_val.table_name.eq_ignore_ascii_case(current_model_name) // model name is table name + && found_val.table_name.eq_ignore_ascii_case(current_model_name) && found_val.column_name.eq_ignore_ascii_case(dim_name_str) }) .map(|found_val| found_val.value.clone()) - .collect::>() // Deduplicate + .collect::>() .into_iter() - .take(20) // Limit to max 20 unique values + .take(20) .collect(); if !relevant_values_for_dim.is_empty() { @@ -1469,7 +814,6 @@ fn inject_values_into_single_model_yaml( values_count = relevant_values_for_dim.len(), "Injecting relevant_values into YAML dimension" ); - // Add/update relevant_values field in the YAML dimension map if let Some(dim_map) = dim_yaml_value.as_mapping_mut() { dim_map.insert( serde_yaml::Value::String("relevant_values".to_string()), @@ -1487,36 +831,31 @@ fn inject_values_into_single_model_yaml( } /// Injects relevant values from a pre-compiled list into the YML of a dataset. -/// Matches values based on the database/schema/table/column defined in the YML. async fn inject_prefound_values_into_yml( yml_content: &str, - all_found_values: &[FoundValueInfo], // Use the pre-found values + all_found_values: &[FoundValueInfo], ) -> Result { if yml_content.trim().is_empty() { debug!("inject_prefound_values_into_yml: YML content is empty, returning as is."); return Ok(String::new()); } - // Parse YAML for dimension definitions and modification + let mut root_yaml_val: serde_yaml::Value = serde_yaml::from_str(yml_content) .context("Failed to parse dataset YAML for injecting values")?; - // Extract database structure from YAML (which defines the source for dimensions) - // These functions are now enhanced to understand different YML structures. let database_info = match extract_database_info_from_yaml(yml_content) { Ok(info) => info, Err(e) => { warn!(error = %e, "inject_prefound_values_into_yml: Failed to extract comprehensive database info from YAML, attempting to proceed without it for value injection structure but matches might fail."); - // If we can't get DB info, value matching will be impaired, but we can still try to modify structure if searchable dims are found HashMap::new() } }; - // Get searchable dimensions from the YML let searchable_dimensions = match extract_searchable_dimensions(yml_content) { Ok(dims) => dims, Err(e) => { warn!(error = %e, "inject_prefound_values_into_yml: Failed to extract comprehensive searchable dimensions from YAML, skipping value injection."); - return Ok(yml_content.to_string()); // Return original YML if parsing fails + return Ok(yml_content.to_string()); } }; @@ -1529,18 +868,15 @@ async fn inject_prefound_values_into_yml( return Ok(yml_content.to_string()); } - - // Check if root_yaml_val contains a "models" list and process it if let Some(models_list_yaml_mut) = root_yaml_val.get_mut("models").and_then(|m| m.as_sequence_mut()) { debug!("inject_prefound_values_into_yml: Processing 'models' list structure."); for model_yaml_item_mut in models_list_yaml_mut { - // Extract model_name first to drop the immutable borrow on model_yaml_item_mut let model_name_owned: Option = model_yaml_item_mut.get("name").and_then(|n| n.as_str()).map(String::from); if let Some(name_str) = model_name_owned { inject_values_into_single_model_yaml( - model_yaml_item_mut, // Now can be borrowed mutably - &name_str, // Use the owned string slice + model_yaml_item_mut, + &name_str, &database_info, &searchable_dimensions, all_found_values, @@ -1550,15 +886,13 @@ async fn inject_prefound_values_into_yml( } } } else { - // Assume root_yaml_val itself is a single model definition (new spec parsed initially, or old flat style) debug!("inject_prefound_values_into_yml: Processing YAML as a single root model structure."); - // Extract model_name first to drop the immutable borrow on root_yaml_val let model_name_owned_root: Option = root_yaml_val.get("name").and_then(|n| n.as_str()).map(String::from); if let Some(name_str_root) = model_name_owned_root { inject_values_into_single_model_yaml( - &mut root_yaml_val, // Now can be borrowed mutably - &name_str_root, // Use the owned string slice + &mut root_yaml_val, + &name_str_root, &database_info, &searchable_dimensions, all_found_values, @@ -1566,14 +900,12 @@ async fn inject_prefound_values_into_yml( } else { warn!( "inject_prefound_values_into_yml: Root YAML object is not a 'models' list and lacks a 'name' field. Cannot process as single model. YML: {:?}", - root_yaml_val.as_mapping().map(|m| m.keys().collect::>()) // Log keys if it's a map + root_yaml_val.as_mapping().map(|m| m.keys().collect::>()) ); - // If it's not a models list and not a named model at the root, it's unclear how to proceed. - // Return original YML. return Ok(yml_content.to_string()); } } - // Convert back to YAML string + serde_yaml::to_string(&root_yaml_val) .context("Failed to convert updated YAML with injected values back to string") } \ No newline at end of file