diff --git a/api/libs/handlers/src/chats/asset_messages.rs b/api/libs/handlers/src/chats/asset_messages.rs index 49613eac2..c5ba3a049 100644 --- a/api/libs/handlers/src/chats/asset_messages.rs +++ b/api/libs/handlers/src/chats/asset_messages.rs @@ -2,25 +2,30 @@ use anyhow::{anyhow, Result}; use chrono::Utc; use database::{ enums::AssetType, - models::{Message, MessageToFile}, + models::{DashboardFile, Message, MessageToFile, MetricFile}, pool::get_pg_pool, - schema::{chats, messages, messages_to_files}, + schema::{chats, dashboard_files, messages, messages_to_files, metric_files}, }; use diesel::{insert_into, ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; +// Using json for our serialization use middleware::AuthenticatedUser; +use serde_json::json; use uuid::Uuid; use super::context_loaders::fetch_asset_details; /// Generate default messages for prompt-less asset-based chats /// -/// This function creates a pair of messages to be shown in a chat when a user -/// opens an asset without providing a prompt: -/// 1. A file message that represents the asset itself -/// 2. A text message with placeholder content +/// This function creates a message to be shown in a chat when a user +/// opens an asset without providing a prompt. It includes: +/// 1. A file response that represents the asset itself +/// 2. A text response with helpful context /// -/// The function also checks that the user has permission to view the asset +/// The function also adds the asset information to raw_llm_messages so +/// the agent can understand the context of the asset being viewed. +/// +/// The function checks that the user has permission to view the asset /// and fetches the asset details for display. pub async fn generate_asset_messages( asset_id: Uuid, @@ -34,62 +39,207 @@ pub async fn generate_asset_messages( // Fetch asset details based on type let asset_details = fetch_asset_details(asset_id, asset_type).await?; - // Create file message - let file_message_id = Uuid::new_v4(); - let file_message = Message { - id: file_message_id, - request_message: None, // Empty request for auto-generated messages - chat_id: Uuid::nil(), // Will be set by caller - created_by: user.id, - created_at: Utc::now(), - updated_at: Utc::now(), - deleted_at: None, - response_messages: serde_json::json!([{ - "type": "file", - "id": file_message_id.to_string(), - "fileType": asset_details.file_type, - "fileName": asset_details.name, - "versionNumber": asset_details.version_number, - "filterVersionId": null, - "metadata": [ - { - "status": "completed", - "message": format!("File {} completed", asset_details.name), - "timestamp": Utc::now().timestamp() + // Create a single message with both text and file response + let message_id = Uuid::new_v4(); + let timestamp = Utc::now().timestamp(); + + // Fetch detailed asset information + let mut conn = get_pg_pool().get().await?; + + // Create the import_assets tool call sequence + let tool_call_id = format!("call_{}", Uuid::new_v4().simple().to_string()); + + // Prepare asset data based on asset type + let (asset_data, asset_type_str, additional_files) = match asset_type { + AssetType::MetricFile => { + let metric = metric_files::table + .filter(metric_files::id.eq(asset_id)) + .first::(&mut conn) + .await?; + + // Get YAML content + let yml_content = serde_yaml::to_string(&metric.content)?; + + // For simplicity, we'll just use an empty array for results + // since MetricYml may not have results field + let results = serde_json::json!([]); + + // Create asset data object + let asset_data = json!({ + "id": asset_id.to_string(), + "name": metric.name, + "file_type": "metric", + "asset_type": "metric", + "yml_content": yml_content, + "result_message": "0 records were returned", + "results": results, + "created_at": metric.created_at, + "version_number": metric.version_history.get_version_number(), + "updated_at": metric.updated_at + }); + + (asset_data, "metric", Vec::new()) + }, + AssetType::DashboardFile => { + let dashboard = dashboard_files::table + .filter(dashboard_files::id.eq(asset_id)) + .first::(&mut conn) + .await?; + + // Get YAML content + let yml_content = serde_yaml::to_string(&dashboard.content)?; + + // Create asset data object + let asset_data = json!({ + "id": asset_id.to_string(), + "name": dashboard.name, + "file_type": "dashboard", + "asset_type": "dashboard", + "yml_content": yml_content, + "created_at": dashboard.created_at, + "version_number": dashboard.version_history.get_version_number(), + "updated_at": dashboard.updated_at + }); + + // Extract metric IDs from dashboard + let mut metric_ids = std::collections::HashSet::new(); + for row in &dashboard.content.rows { + for item in &row.items { + metric_ids.insert(item.id); } - ] - }]), - reasoning: serde_json::Value::Array(vec![]), - final_reasoning_message: None, - title: format!("Chat with {}", asset_details.name), - raw_llm_messages: serde_json::json!([]), - feedback: None, + } + + // Load all associated metrics for context (they won't be shown in UI) + let mut metric_files_data = Vec::new(); + for metric_id in metric_ids { + match metric_files::table + .filter(metric_files::id.eq(metric_id)) + .first::(&mut conn) + .await + { + Ok(metric) => { + // Get YAML content for this metric + if let Ok(yml_content) = serde_yaml::to_string(&metric.content) { + // Add metric as additional file data for agent context + let metric_data = json!({ + "id": metric.id.to_string(), + "name": metric.name, + "file_type": "metric", + "asset_type": "metric", + "yml_content": yml_content, + "created_at": metric.created_at, + "version_number": metric.version_history.get_version_number(), + "updated_at": metric.updated_at + }); + + metric_files_data.push(metric_data); + } + }, + Err(e) => { + // Log error but continue with other metrics + tracing::warn!("Failed to load metric {} for dashboard context: {}", metric_id, e); + } + } + } + + tracing::info!( + "Loaded {} metrics as context for dashboard import", + metric_files_data.len() + ); + + (asset_data, "dashboard", metric_files_data) + }, + _ => { + return Err(anyhow!("Unsupported asset type for generating asset messages: {:?}", asset_type)); + } }; - // Create text message with placeholder content - let text_message_id = Uuid::new_v4(); - let text_message = Message { - id: text_message_id, + // Determine appropriate message based on file count + let additional_files_count = additional_files.len(); + let message_text = if additional_files_count == 0 { + format!("Successfully imported 1 {} file.", asset_type_str) + } else { + format!("Successfully imported 1 {} file with {} additional context files.", + asset_type_str, additional_files_count) + }; + + // Create combined file list with the main asset first, followed by context files + let mut all_files = vec![asset_data]; + all_files.extend(additional_files); + + // Create the tool response content + let tool_response_content = json!({ + "message": message_text, + "duration": 928, // Example duration + "files": all_files + }).to_string(); + + // Create the Assistant message with tool call + let assistant_message = serde_json::json!({ + "name": "buster_super_agent", + "role": "assistant", + "tool_calls": [ + { + "id": tool_call_id, + "type": "function", + "function": { + "name": "import_assets", + "arguments": "{}" + } + } + ] + }); + + // Create the Tool response message + let tool_message = serde_json::json!({ + "name": "import_assets", + "role": "tool", + "content": tool_response_content, + "tool_call_id": tool_call_id + }); + + // Combine into raw_llm_messages + let raw_llm_messages = serde_json::json!([assistant_message, tool_message]); + + let message = Message { + id: message_id, request_message: None, // Empty request for auto-generated messages chat_id: Uuid::nil(), // Will be set by caller created_by: user.id, created_at: Utc::now(), updated_at: Utc::now(), deleted_at: None, - response_messages: serde_json::json!([{ - "type": "text", - "id": text_message_id.to_string(), - "message": "DALLIN NEEDS TO PUT VALUE HERE", - "isFinalMessage": true - }]), + response_messages: serde_json::json!([ + { + "type": "text", + "id": Uuid::new_v4().to_string(), + "message": format!("{} has been pulled into a new chat.\n\nContinue chatting to modify or make changes to it.", asset_details.name), + "is_final_message": true + }, + { + "type": "file", + "id": asset_id.to_string(), + "file_type": asset_details.file_type, + "file_name": asset_details.name, + "version_number": asset_details.version_number, + "filter_version_id": null, + "metadata": [ + { + "status": "completed", + "message": "Pulled into new chat", + "timestamp": timestamp + } + ] + } + ]), reasoning: serde_json::Value::Array(vec![]), - final_reasoning_message: None, - title: format!("Chat with {}", asset_details.name), - raw_llm_messages: serde_json::json!([]), + final_reasoning_message: Some("".to_string()), + title: asset_details.name.clone(), + raw_llm_messages, // Add the agent context messages feedback: None, }; - Ok(vec![file_message, text_message]) + Ok(vec![message]) } /// Create association between message and file in the database diff --git a/api/libs/handlers/src/chats/context_loaders/chat_context.rs b/api/libs/handlers/src/chats/context_loaders/chat_context.rs index 2e8e0d51d..374851b33 100644 --- a/api/libs/handlers/src/chats/context_loaders/chat_context.rs +++ b/api/libs/handlers/src/chats/context_loaders/chat_context.rs @@ -27,6 +27,7 @@ impl ChatContextLoader { // Helper function to check for tool usage and set appropriate context async fn update_context_from_tool_calls(agent: &Arc, message: &AgentMessage) { + // Handle tool calls from assistant messages if let AgentMessage::Assistant { tool_calls: Some(tool_calls), .. } = message { for tool_call in tool_calls { match tool_call.function.name.as_str() { @@ -42,6 +43,10 @@ impl ChatContextLoader { agent.set_state_value(String::from("dashboards_available"), Value::Bool(true)) .await; }, + "import_assets" => { + // When we see import_assets, we need to check the content in the corresponding tool response + // This will be handled separately when processing tool messages + }, name if name.contains("file") || name.contains("read") || name.contains("write") || name.contains("edit") => { agent.set_state_value(String::from("files_available"), Value::Bool(true)) .await; @@ -50,6 +55,82 @@ impl ChatContextLoader { } } } + + // Handle tool responses - important for import_assets + if let AgentMessage::Tool { name: Some(tool_name), content, .. } = message { + if tool_name == "import_assets" { + // Parse the tool response to see what was imported + if let Ok(import_result) = serde_json::from_str::(content) { + // Check for files array + if let Some(files) = import_result.get("files").and_then(|f| f.as_array()) { + if !files.is_empty() { + // Set files_available for any imported files + agent.set_state_value(String::from("files_available"), Value::Bool(true)) + .await; + + // Check each file to determine its type + let mut has_metrics = false; + let mut has_dashboards = false; + let mut has_datasets = false; + + for file in files { + // Check file_type/asset_type to determine what kind of asset this is + let file_type = file.get("file_type").and_then(|ft| ft.as_str()) + .or_else(|| file.get("asset_type").and_then(|at| at.as_str())); + + tracing::debug!("Processing imported file with type: {:?}", file_type); + + match file_type { + Some("metric") => { + has_metrics = true; + + // Check if the metric has dataset references + if let Some(yml_content) = file.get("yml_content").and_then(|y| y.as_str()) { + if yml_content.contains("dataset") || yml_content.contains("datasetIds") { + has_datasets = true; + } + } + }, + Some("dashboard") => { + has_dashboards = true; + + // Dashboards often reference metrics too + has_metrics = true; + + // Check if the dashboard has dataset references via metrics + if let Some(yml_content) = file.get("yml_content").and_then(|y| y.as_str()) { + if yml_content.contains("dataset") || yml_content.contains("datasetIds") { + has_datasets = true; + } + } + }, + _ => { + tracing::debug!("Unknown file type in import_assets: {:?}", file_type); + } + } + } + + // Set appropriate state values based on what we found + if has_metrics { + tracing::debug!("Setting metrics_available state to true"); + agent.set_state_value(String::from("metrics_available"), Value::Bool(true)) + .await; + } + if has_dashboards { + tracing::debug!("Setting dashboards_available state to true"); + agent.set_state_value(String::from("dashboards_available"), Value::Bool(true)) + .await; + } + if has_datasets { + tracing::debug!("Setting data_context state to true"); + agent.set_state_value(String::from("data_context"), Value::Bool(true)) + .await; + } + } + } + } + } + } } } diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 32802050e..64d0e090d 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -210,37 +210,51 @@ pub async fn post_chat_handler( for mut message in messages { message.chat_id = chat_id; - // If this is a file message, create file association - if message.response_messages.is_array() { - let response_arr = message.response_messages.as_array().unwrap(); - if !response_arr.is_empty() { - if let Some(response) = response_arr.get(0) { - if response.get("type").map_or(false, |t| t == "file") { - // Extract version_number from response, default to 1 if not found - let asset_version_number = response.get("versionNumber") - .and_then(|v| v.as_i64()) - .map(|v| v as i32) - .unwrap_or(1); - - // Create association in database - let _ = create_message_file_association( - message.id, - asset_id_value, - asset_version_number, - asset_type_value, - ) - .await; - } - } - } - } - - // Insert message into database + // Insert message into database first let mut conn = get_pg_pool().get().await?; insert_into(database::schema::messages::table) .values(&message) .execute(&mut conn) .await?; + + // After message is inserted, create file association if needed + if message.response_messages.is_array() { + let response_arr = message.response_messages.as_array().unwrap(); + + // Find a file response in the array + for response in response_arr { + if response.get("type").map_or(false, |t| t == "file") { + // Extract version_number from response, default to 1 if not found + let asset_version_number = response.get("version_number") + .and_then(|v| v.as_i64()) + .map(|v| v as i32) + .unwrap_or(1); + + // Ensure the response id matches the asset_id + let response_id = response.get("id") + .and_then(|id| id.as_str()) + .and_then(|id_str| Uuid::parse_str(id_str).ok()) + .unwrap_or(asset_id_value); + + // Verify the response ID matches the asset ID + if response_id == asset_id_value { + // Create association in database - now the message exists in DB + if let Err(e) = create_message_file_association( + message.id, + asset_id_value, + asset_version_number, + asset_type_value, + ) + .await { + tracing::warn!("Failed to create message file association: {}", e); + } + } + + // We only need to process one file association + break; + } + } + } // Add to updated messages for the response updated_messages.push(message); @@ -264,8 +278,38 @@ pub async fn post_chat_handler( ); chat_with_messages.add_message(chat_message); + + // We don't need to process the raw_llm_messages here + // The ChatContextLoader.update_context_from_tool_calls function will handle the asset state + // when the agent is initialized and loads the context } + // Explicitly update the chat in the database with most_recent_file information + // to ensure it behaves like files generated in a chat + let asset_type_string = match asset_type_value { + AssetType::MetricFile => Some("metric".to_string()), + AssetType::DashboardFile => Some("dashboard".to_string()), + _ => None, + }; + + if let Some(file_type) = asset_type_string { + // Update the chat directly to ensure it has the most_recent_file information + let mut conn = get_pg_pool().get().await?; + diesel::update(chats::table.find(chat_id)) + .set(( + chats::most_recent_file_id.eq(Some(asset_id_value)), + chats::most_recent_file_type.eq(Some(file_type.clone())), + chats::updated_at.eq(Utc::now()), + )) + .execute(&mut conn) + .await?; + + tracing::info!( + "Updated chat {} with most_recent_file_id: {}, most_recent_file_type: {}", + chat_id, asset_id_value, file_type + ); + } + // Return early with auto-generated messages - no need for agent processing return Ok(chat_with_messages); } @@ -486,7 +530,7 @@ pub async fn post_chat_handler( }), response_messages.clone(), reasoning_messages.clone(), - Some(formatted_reasoning_duration.clone()), // Use the formatted duration string + Some(formatted_reasoning_duration.clone()), // Use formatted reasoning duration for regular messages Utc::now(), ); @@ -503,7 +547,7 @@ pub async fn post_chat_handler( deleted_at: None, response_messages: serde_json::to_value(&response_messages)?, reasoning: serde_json::to_value(&reasoning_messages)?, - final_reasoning_message: Some(formatted_reasoning_duration), // Use the formatted duration string + final_reasoning_message: Some(formatted_reasoning_duration), // Use formatted reasoning duration for regular messages title: title.title.clone().unwrap_or_default(), raw_llm_messages: serde_json::to_value(&raw_llm_messages)?, feedback: None, @@ -797,6 +841,7 @@ pub struct BusterReasoningMessageContainer { } #[derive(Debug, Serialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct BusterChatResponseFileMetadata { pub status: String, pub message: String, @@ -805,7 +850,6 @@ pub struct BusterChatResponseFileMetadata { #[derive(Debug, Serialize, Clone)] #[serde(tag = "type")] -#[serde(rename_all = "camelCase")] pub enum BusterChatMessage { Text { id: String, @@ -1009,10 +1053,7 @@ pub async fn transform_message( filter_version_id: None, metadata: Some(vec![BusterChatResponseFileMetadata { status: "completed".to_string(), - message: format!( - "Created new {}", - file_content.file_type - ), + message: "Created by Buster".to_string(), timestamp: Some(Utc::now().timestamp()), }]), }; @@ -1095,10 +1136,7 @@ pub async fn transform_message( filter_version_id: None, metadata: Some(vec![BusterChatResponseFileMetadata { status: "completed".to_string(), - message: format!( - "Created new {}", - file_content.file_type - ), + message: "Created by Buster".to_string(), timestamp: Some(Utc::now().timestamp()), }]), }; @@ -1953,6 +1991,8 @@ Return only the title text with no additional formatting, explanation, quotes, n /// The function streams the title back as it's being generated. type BusterContainerResult = Result<(BusterContainer, ThreadEvent)>; +// The implementation has been moved to ChatContextLoader.update_context_from_tool_calls + pub async fn generate_conversation_title( messages: &[AgentMessage], message_id: &Uuid, @@ -2050,7 +2090,7 @@ async fn initialize_chat( let (asset_id, asset_type) = normalize_asset_fields(request); if let (Some(asset_id), Some(asset_type)) = (asset_id, asset_type) { match fetch_asset_details(asset_id, asset_type).await { - Ok(details) => format!("View {}", details.name), + Ok(details) => details.name.clone(), Err(_) => "New Chat".to_string(), } } else {