From 44042cd676bd56e772b707b97aaa84dfd7fcb55c Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 8 Apr 2025 15:46:37 -0600 Subject: [PATCH] load in assets and mark context --- api/libs/handlers/src/chats/asset_messages.rs | 122 ++++++++++++++++-- .../src/chats/context_loaders/chat_context.rs | 81 ++++++++++++ .../handlers/src/chats/post_chat_handler.rs | 14 +- 3 files changed, 204 insertions(+), 13 deletions(-) diff --git a/api/libs/handlers/src/chats/asset_messages.rs b/api/libs/handlers/src/chats/asset_messages.rs index 1ae9bb980..f55cbfc28 100644 --- a/api/libs/handlers/src/chats/asset_messages.rs +++ b/api/libs/handlers/src/chats/asset_messages.rs @@ -2,25 +2,30 @@ use anyhow::{anyhow, Result}; use chrono::Utc; use database::{ enums::AssetType, - models::{Message, MessageToFile}, + models::{DashboardFile, Message, MessageToFile, MetricFile}, pool::get_pg_pool, - schema::{chats, messages, messages_to_files}, + schema::{chats, dashboard_files, messages, messages_to_files, metric_files}, }; use diesel::{insert_into, ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; +// Using json for our serialization use middleware::AuthenticatedUser; +use serde_json::json; use uuid::Uuid; use super::context_loaders::fetch_asset_details; /// Generate default messages for prompt-less asset-based chats /// -/// This function creates a pair of messages to be shown in a chat when a user -/// opens an asset without providing a prompt: -/// 1. A file message that represents the asset itself -/// 2. A text message with placeholder content +/// This function creates a message to be shown in a chat when a user +/// opens an asset without providing a prompt. It includes: +/// 1. A file response that represents the asset itself +/// 2. A text response with helpful context /// -/// The function also checks that the user has permission to view the asset +/// The function also adds the asset information to raw_llm_messages so +/// the agent can understand the context of the asset being viewed. +/// +/// The function checks that the user has permission to view the asset /// and fetches the asset details for display. pub async fn generate_asset_messages( asset_id: Uuid, @@ -38,6 +43,105 @@ pub async fn generate_asset_messages( let message_id = Uuid::new_v4(); let timestamp = Utc::now().timestamp(); + // Fetch detailed asset information + let mut conn = get_pg_pool().get().await?; + + // Create the import_assets tool call sequence + let tool_call_id = format!("call_{}", Uuid::new_v4().simple().to_string()); + + // Prepare asset data based on asset type + let (asset_data, asset_type_str) = match asset_type { + AssetType::MetricFile => { + let metric = metric_files::table + .filter(metric_files::id.eq(asset_id)) + .first::(&mut conn) + .await?; + + // Get YAML content + let yml_content = serde_yaml::to_string(&metric.content)?; + + // For simplicity, we'll just use an empty array for results + // since MetricYml may not have results field + let results = serde_json::json!([]); + + // Create asset data object + let asset_data = json!({ + "id": asset_id.to_string(), + "name": metric.name, + "file_type": "metric", + "asset_type": "metric", + "yml_content": yml_content, + "result_message": "0 records were returned", + "results": results, + "created_at": metric.created_at, + "version_number": metric.version_history.get_version_number(), + "updated_at": metric.updated_at + }); + + (asset_data, "metric") + }, + AssetType::DashboardFile => { + let dashboard = dashboard_files::table + .filter(dashboard_files::id.eq(asset_id)) + .first::(&mut conn) + .await?; + + // Get YAML content + let yml_content = serde_yaml::to_string(&dashboard.content)?; + + // Create asset data object + let asset_data = json!({ + "id": asset_id.to_string(), + "name": dashboard.name, + "file_type": "dashboard", + "asset_type": "dashboard", + "yml_content": yml_content, + "created_at": dashboard.created_at, + "version_number": dashboard.version_history.get_version_number(), + "updated_at": dashboard.updated_at + }); + + (asset_data, "dashboard") + }, + _ => { + return Err(anyhow!("Unsupported asset type for generating asset messages: {:?}", asset_type)); + } + }; + + // Create the tool response content + let tool_response_content = json!({ + "message": format!("Successfully imported 1 {} files.", asset_type_str), + "duration": 928, // Example duration + "files": [asset_data] + }).to_string(); + + // Create the Assistant message with tool call + let assistant_message = serde_json::json!({ + "name": "buster_super_agent", + "role": "assistant", + "tool_calls": [ + { + "id": tool_call_id, + "type": "function", + "function": { + "name": "import_assets", + "arguments": "{}" + } + } + ] + }); + + // Create the Tool response message + let tool_message = serde_json::json!({ + "name": "import_assets", + "role": "tool", + "content": tool_response_content, + "tool_call_id": tool_call_id + }); + + // Combine into raw_llm_messages + let raw_llm_messages = serde_json::json!([assistant_message, tool_message]); + let message = Message { id: message_id, request_message: None, // Empty request for auto-generated messages @@ -51,7 +155,7 @@ pub async fn generate_asset_messages( "type": "text", "id": Uuid::new_v4().to_string(), "message": format!("{} has been pulled into a new chat.\n\nContinue chatting to modify or make changes to it.", asset_details.name), - "isFinalMessage": true + "is_final_message": true }, { "type": "file", @@ -72,7 +176,7 @@ pub async fn generate_asset_messages( reasoning: serde_json::Value::Array(vec![]), final_reasoning_message: Some("".to_string()), title: asset_details.name.clone(), - raw_llm_messages: serde_json::json!([]), + raw_llm_messages, // Add the agent context messages feedback: None, }; diff --git a/api/libs/handlers/src/chats/context_loaders/chat_context.rs b/api/libs/handlers/src/chats/context_loaders/chat_context.rs index 2e8e0d51d..374851b33 100644 --- a/api/libs/handlers/src/chats/context_loaders/chat_context.rs +++ b/api/libs/handlers/src/chats/context_loaders/chat_context.rs @@ -27,6 +27,7 @@ impl ChatContextLoader { // Helper function to check for tool usage and set appropriate context async fn update_context_from_tool_calls(agent: &Arc, message: &AgentMessage) { + // Handle tool calls from assistant messages if let AgentMessage::Assistant { tool_calls: Some(tool_calls), .. } = message { for tool_call in tool_calls { match tool_call.function.name.as_str() { @@ -42,6 +43,10 @@ impl ChatContextLoader { agent.set_state_value(String::from("dashboards_available"), Value::Bool(true)) .await; }, + "import_assets" => { + // When we see import_assets, we need to check the content in the corresponding tool response + // This will be handled separately when processing tool messages + }, name if name.contains("file") || name.contains("read") || name.contains("write") || name.contains("edit") => { agent.set_state_value(String::from("files_available"), Value::Bool(true)) .await; @@ -50,6 +55,82 @@ impl ChatContextLoader { } } } + + // Handle tool responses - important for import_assets + if let AgentMessage::Tool { name: Some(tool_name), content, .. } = message { + if tool_name == "import_assets" { + // Parse the tool response to see what was imported + if let Ok(import_result) = serde_json::from_str::(content) { + // Check for files array + if let Some(files) = import_result.get("files").and_then(|f| f.as_array()) { + if !files.is_empty() { + // Set files_available for any imported files + agent.set_state_value(String::from("files_available"), Value::Bool(true)) + .await; + + // Check each file to determine its type + let mut has_metrics = false; + let mut has_dashboards = false; + let mut has_datasets = false; + + for file in files { + // Check file_type/asset_type to determine what kind of asset this is + let file_type = file.get("file_type").and_then(|ft| ft.as_str()) + .or_else(|| file.get("asset_type").and_then(|at| at.as_str())); + + tracing::debug!("Processing imported file with type: {:?}", file_type); + + match file_type { + Some("metric") => { + has_metrics = true; + + // Check if the metric has dataset references + if let Some(yml_content) = file.get("yml_content").and_then(|y| y.as_str()) { + if yml_content.contains("dataset") || yml_content.contains("datasetIds") { + has_datasets = true; + } + } + }, + Some("dashboard") => { + has_dashboards = true; + + // Dashboards often reference metrics too + has_metrics = true; + + // Check if the dashboard has dataset references via metrics + if let Some(yml_content) = file.get("yml_content").and_then(|y| y.as_str()) { + if yml_content.contains("dataset") || yml_content.contains("datasetIds") { + has_datasets = true; + } + } + }, + _ => { + tracing::debug!("Unknown file type in import_assets: {:?}", file_type); + } + } + } + + // Set appropriate state values based on what we found + if has_metrics { + tracing::debug!("Setting metrics_available state to true"); + agent.set_state_value(String::from("metrics_available"), Value::Bool(true)) + .await; + } + if has_dashboards { + tracing::debug!("Setting dashboards_available state to true"); + agent.set_state_value(String::from("dashboards_available"), Value::Bool(true)) + .await; + } + if has_datasets { + tracing::debug!("Setting data_context state to true"); + agent.set_state_value(String::from("data_context"), Value::Bool(true)) + .await; + } + } + } + } + } + } } } diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 4a7eab782..8872bbe9e 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -276,6 +276,10 @@ pub async fn post_chat_handler( ); chat_with_messages.add_message(chat_message); + + // We don't need to process the raw_llm_messages here + // The ChatContextLoader.update_context_from_tool_calls function will handle the asset state + // when the agent is initialized and loads the context } // Return early with auto-generated messages - no need for agent processing @@ -498,7 +502,7 @@ pub async fn post_chat_handler( }), response_messages.clone(), reasoning_messages.clone(), - Some("".to_string()), // Empty string as requested + Some(formatted_reasoning_duration.clone()), // Use formatted reasoning duration for regular messages Utc::now(), ); @@ -515,7 +519,7 @@ pub async fn post_chat_handler( deleted_at: None, response_messages: serde_json::to_value(&response_messages)?, reasoning: serde_json::to_value(&reasoning_messages)?, - final_reasoning_message: Some("".to_string()), // Empty string as requested + final_reasoning_message: Some(formatted_reasoning_duration), // Use formatted reasoning duration for regular messages title: title.title.clone().unwrap_or_default(), raw_llm_messages: serde_json::to_value(&raw_llm_messages)?, feedback: None, @@ -1023,7 +1027,7 @@ pub async fn transform_message( filter_version_id: None, metadata: Some(vec![BusterChatResponseFileMetadata { status: "completed".to_string(), - message: "Pulled into new chat".to_string(), + message: "Created by Buster".to_string(), timestamp: Some(Utc::now().timestamp()), }]), }; @@ -1106,7 +1110,7 @@ pub async fn transform_message( filter_version_id: None, metadata: Some(vec![BusterChatResponseFileMetadata { status: "completed".to_string(), - message: "Pulled into new chat".to_string(), + message: "Created by Buster".to_string(), timestamp: Some(Utc::now().timestamp()), }]), }; @@ -1961,6 +1965,8 @@ Return only the title text with no additional formatting, explanation, quotes, n /// The function streams the title back as it's being generated. type BusterContainerResult = Result<(BusterContainer, ThreadEvent)>; +// The implementation has been moved to ChatContextLoader.update_context_from_tool_calls + pub async fn generate_conversation_title( messages: &[AgentMessage], message_id: &Uuid,