mirror of https://github.com/buster-so/buster.git
merging bus-1065
This commit is contained in:
commit
ebc4815aad
|
@ -2,25 +2,30 @@ use anyhow::{anyhow, Result};
|
|||
use chrono::Utc;
|
||||
use database::{
|
||||
enums::AssetType,
|
||||
models::{Message, MessageToFile},
|
||||
models::{DashboardFile, Message, MessageToFile, MetricFile},
|
||||
pool::get_pg_pool,
|
||||
schema::{chats, messages, messages_to_files},
|
||||
schema::{chats, dashboard_files, messages, messages_to_files, metric_files},
|
||||
};
|
||||
use diesel::{insert_into, ExpressionMethods, QueryDsl};
|
||||
use diesel_async::RunQueryDsl;
|
||||
// Using json for our serialization
|
||||
use middleware::AuthenticatedUser;
|
||||
use serde_json::json;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::context_loaders::fetch_asset_details;
|
||||
|
||||
/// Generate default messages for prompt-less asset-based chats
|
||||
///
|
||||
/// This function creates a pair of messages to be shown in a chat when a user
|
||||
/// opens an asset without providing a prompt:
|
||||
/// 1. A file message that represents the asset itself
|
||||
/// 2. A text message with placeholder content
|
||||
/// This function creates a message to be shown in a chat when a user
|
||||
/// opens an asset without providing a prompt. It includes:
|
||||
/// 1. A file response that represents the asset itself
|
||||
/// 2. A text response with helpful context
|
||||
///
|
||||
/// The function also checks that the user has permission to view the asset
|
||||
/// The function also adds the asset information to raw_llm_messages so
|
||||
/// the agent can understand the context of the asset being viewed.
|
||||
///
|
||||
/// The function checks that the user has permission to view the asset
|
||||
/// and fetches the asset details for display.
|
||||
pub async fn generate_asset_messages(
|
||||
asset_id: Uuid,
|
||||
|
@ -34,62 +39,207 @@ pub async fn generate_asset_messages(
|
|||
// Fetch asset details based on type
|
||||
let asset_details = fetch_asset_details(asset_id, asset_type).await?;
|
||||
|
||||
// Create file message
|
||||
let file_message_id = Uuid::new_v4();
|
||||
let file_message = Message {
|
||||
id: file_message_id,
|
||||
request_message: None, // Empty request for auto-generated messages
|
||||
chat_id: Uuid::nil(), // Will be set by caller
|
||||
created_by: user.id,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
response_messages: serde_json::json!([{
|
||||
"type": "file",
|
||||
"id": file_message_id.to_string(),
|
||||
"fileType": asset_details.file_type,
|
||||
"fileName": asset_details.name,
|
||||
"versionNumber": asset_details.version_number,
|
||||
"filterVersionId": null,
|
||||
"metadata": [
|
||||
{
|
||||
"status": "completed",
|
||||
"message": format!("File {} completed", asset_details.name),
|
||||
"timestamp": Utc::now().timestamp()
|
||||
// Create a single message with both text and file response
|
||||
let message_id = Uuid::new_v4();
|
||||
let timestamp = Utc::now().timestamp();
|
||||
|
||||
// Fetch detailed asset information
|
||||
let mut conn = get_pg_pool().get().await?;
|
||||
|
||||
// Create the import_assets tool call sequence
|
||||
let tool_call_id = format!("call_{}", Uuid::new_v4().simple().to_string());
|
||||
|
||||
// Prepare asset data based on asset type
|
||||
let (asset_data, asset_type_str, additional_files) = match asset_type {
|
||||
AssetType::MetricFile => {
|
||||
let metric = metric_files::table
|
||||
.filter(metric_files::id.eq(asset_id))
|
||||
.first::<MetricFile>(&mut conn)
|
||||
.await?;
|
||||
|
||||
// Get YAML content
|
||||
let yml_content = serde_yaml::to_string(&metric.content)?;
|
||||
|
||||
// For simplicity, we'll just use an empty array for results
|
||||
// since MetricYml may not have results field
|
||||
let results = serde_json::json!([]);
|
||||
|
||||
// Create asset data object
|
||||
let asset_data = json!({
|
||||
"id": asset_id.to_string(),
|
||||
"name": metric.name,
|
||||
"file_type": "metric",
|
||||
"asset_type": "metric",
|
||||
"yml_content": yml_content,
|
||||
"result_message": "0 records were returned",
|
||||
"results": results,
|
||||
"created_at": metric.created_at,
|
||||
"version_number": metric.version_history.get_version_number(),
|
||||
"updated_at": metric.updated_at
|
||||
});
|
||||
|
||||
(asset_data, "metric", Vec::new())
|
||||
},
|
||||
AssetType::DashboardFile => {
|
||||
let dashboard = dashboard_files::table
|
||||
.filter(dashboard_files::id.eq(asset_id))
|
||||
.first::<DashboardFile>(&mut conn)
|
||||
.await?;
|
||||
|
||||
// Get YAML content
|
||||
let yml_content = serde_yaml::to_string(&dashboard.content)?;
|
||||
|
||||
// Create asset data object
|
||||
let asset_data = json!({
|
||||
"id": asset_id.to_string(),
|
||||
"name": dashboard.name,
|
||||
"file_type": "dashboard",
|
||||
"asset_type": "dashboard",
|
||||
"yml_content": yml_content,
|
||||
"created_at": dashboard.created_at,
|
||||
"version_number": dashboard.version_history.get_version_number(),
|
||||
"updated_at": dashboard.updated_at
|
||||
});
|
||||
|
||||
// Extract metric IDs from dashboard
|
||||
let mut metric_ids = std::collections::HashSet::new();
|
||||
for row in &dashboard.content.rows {
|
||||
for item in &row.items {
|
||||
metric_ids.insert(item.id);
|
||||
}
|
||||
]
|
||||
}]),
|
||||
reasoning: serde_json::Value::Array(vec![]),
|
||||
final_reasoning_message: None,
|
||||
title: format!("Chat with {}", asset_details.name),
|
||||
raw_llm_messages: serde_json::json!([]),
|
||||
feedback: None,
|
||||
}
|
||||
|
||||
// Load all associated metrics for context (they won't be shown in UI)
|
||||
let mut metric_files_data = Vec::new();
|
||||
for metric_id in metric_ids {
|
||||
match metric_files::table
|
||||
.filter(metric_files::id.eq(metric_id))
|
||||
.first::<MetricFile>(&mut conn)
|
||||
.await
|
||||
{
|
||||
Ok(metric) => {
|
||||
// Get YAML content for this metric
|
||||
if let Ok(yml_content) = serde_yaml::to_string(&metric.content) {
|
||||
// Add metric as additional file data for agent context
|
||||
let metric_data = json!({
|
||||
"id": metric.id.to_string(),
|
||||
"name": metric.name,
|
||||
"file_type": "metric",
|
||||
"asset_type": "metric",
|
||||
"yml_content": yml_content,
|
||||
"created_at": metric.created_at,
|
||||
"version_number": metric.version_history.get_version_number(),
|
||||
"updated_at": metric.updated_at
|
||||
});
|
||||
|
||||
metric_files_data.push(metric_data);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
// Log error but continue with other metrics
|
||||
tracing::warn!("Failed to load metric {} for dashboard context: {}", metric_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Loaded {} metrics as context for dashboard import",
|
||||
metric_files_data.len()
|
||||
);
|
||||
|
||||
(asset_data, "dashboard", metric_files_data)
|
||||
},
|
||||
_ => {
|
||||
return Err(anyhow!("Unsupported asset type for generating asset messages: {:?}", asset_type));
|
||||
}
|
||||
};
|
||||
|
||||
// Create text message with placeholder content
|
||||
let text_message_id = Uuid::new_v4();
|
||||
let text_message = Message {
|
||||
id: text_message_id,
|
||||
// Determine appropriate message based on file count
|
||||
let additional_files_count = additional_files.len();
|
||||
let message_text = if additional_files_count == 0 {
|
||||
format!("Successfully imported 1 {} file.", asset_type_str)
|
||||
} else {
|
||||
format!("Successfully imported 1 {} file with {} additional context files.",
|
||||
asset_type_str, additional_files_count)
|
||||
};
|
||||
|
||||
// Create combined file list with the main asset first, followed by context files
|
||||
let mut all_files = vec![asset_data];
|
||||
all_files.extend(additional_files);
|
||||
|
||||
// Create the tool response content
|
||||
let tool_response_content = json!({
|
||||
"message": message_text,
|
||||
"duration": 928, // Example duration
|
||||
"files": all_files
|
||||
}).to_string();
|
||||
|
||||
// Create the Assistant message with tool call
|
||||
let assistant_message = serde_json::json!({
|
||||
"name": "buster_super_agent",
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": tool_call_id,
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "import_assets",
|
||||
"arguments": "{}"
|
||||
}
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
// Create the Tool response message
|
||||
let tool_message = serde_json::json!({
|
||||
"name": "import_assets",
|
||||
"role": "tool",
|
||||
"content": tool_response_content,
|
||||
"tool_call_id": tool_call_id
|
||||
});
|
||||
|
||||
// Combine into raw_llm_messages
|
||||
let raw_llm_messages = serde_json::json!([assistant_message, tool_message]);
|
||||
|
||||
let message = Message {
|
||||
id: message_id,
|
||||
request_message: None, // Empty request for auto-generated messages
|
||||
chat_id: Uuid::nil(), // Will be set by caller
|
||||
created_by: user.id,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
response_messages: serde_json::json!([{
|
||||
"type": "text",
|
||||
"id": text_message_id.to_string(),
|
||||
"message": "DALLIN NEEDS TO PUT VALUE HERE",
|
||||
"isFinalMessage": true
|
||||
}]),
|
||||
response_messages: serde_json::json!([
|
||||
{
|
||||
"type": "text",
|
||||
"id": Uuid::new_v4().to_string(),
|
||||
"message": format!("{} has been pulled into a new chat.\n\nContinue chatting to modify or make changes to it.", asset_details.name),
|
||||
"is_final_message": true
|
||||
},
|
||||
{
|
||||
"type": "file",
|
||||
"id": asset_id.to_string(),
|
||||
"file_type": asset_details.file_type,
|
||||
"file_name": asset_details.name,
|
||||
"version_number": asset_details.version_number,
|
||||
"filter_version_id": null,
|
||||
"metadata": [
|
||||
{
|
||||
"status": "completed",
|
||||
"message": "Pulled into new chat",
|
||||
"timestamp": timestamp
|
||||
}
|
||||
]
|
||||
}
|
||||
]),
|
||||
reasoning: serde_json::Value::Array(vec![]),
|
||||
final_reasoning_message: None,
|
||||
title: format!("Chat with {}", asset_details.name),
|
||||
raw_llm_messages: serde_json::json!([]),
|
||||
final_reasoning_message: Some("".to_string()),
|
||||
title: asset_details.name.clone(),
|
||||
raw_llm_messages, // Add the agent context messages
|
||||
feedback: None,
|
||||
};
|
||||
|
||||
Ok(vec![file_message, text_message])
|
||||
Ok(vec![message])
|
||||
}
|
||||
|
||||
/// Create association between message and file in the database
|
||||
|
|
|
@ -27,6 +27,7 @@ impl ChatContextLoader {
|
|||
|
||||
// Helper function to check for tool usage and set appropriate context
|
||||
async fn update_context_from_tool_calls(agent: &Arc<Agent>, message: &AgentMessage) {
|
||||
// Handle tool calls from assistant messages
|
||||
if let AgentMessage::Assistant { tool_calls: Some(tool_calls), .. } = message {
|
||||
for tool_call in tool_calls {
|
||||
match tool_call.function.name.as_str() {
|
||||
|
@ -42,6 +43,10 @@ impl ChatContextLoader {
|
|||
agent.set_state_value(String::from("dashboards_available"), Value::Bool(true))
|
||||
.await;
|
||||
},
|
||||
"import_assets" => {
|
||||
// When we see import_assets, we need to check the content in the corresponding tool response
|
||||
// This will be handled separately when processing tool messages
|
||||
},
|
||||
name if name.contains("file") || name.contains("read") || name.contains("write") || name.contains("edit") => {
|
||||
agent.set_state_value(String::from("files_available"), Value::Bool(true))
|
||||
.await;
|
||||
|
@ -50,6 +55,82 @@ impl ChatContextLoader {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle tool responses - important for import_assets
|
||||
if let AgentMessage::Tool { name: Some(tool_name), content, .. } = message {
|
||||
if tool_name == "import_assets" {
|
||||
// Parse the tool response to see what was imported
|
||||
if let Ok(import_result) = serde_json::from_str::<serde_json::Value>(content) {
|
||||
// Check for files array
|
||||
if let Some(files) = import_result.get("files").and_then(|f| f.as_array()) {
|
||||
if !files.is_empty() {
|
||||
// Set files_available for any imported files
|
||||
agent.set_state_value(String::from("files_available"), Value::Bool(true))
|
||||
.await;
|
||||
|
||||
// Check each file to determine its type
|
||||
let mut has_metrics = false;
|
||||
let mut has_dashboards = false;
|
||||
let mut has_datasets = false;
|
||||
|
||||
for file in files {
|
||||
// Check file_type/asset_type to determine what kind of asset this is
|
||||
let file_type = file.get("file_type").and_then(|ft| ft.as_str())
|
||||
.or_else(|| file.get("asset_type").and_then(|at| at.as_str()));
|
||||
|
||||
tracing::debug!("Processing imported file with type: {:?}", file_type);
|
||||
|
||||
match file_type {
|
||||
Some("metric") => {
|
||||
has_metrics = true;
|
||||
|
||||
// Check if the metric has dataset references
|
||||
if let Some(yml_content) = file.get("yml_content").and_then(|y| y.as_str()) {
|
||||
if yml_content.contains("dataset") || yml_content.contains("datasetIds") {
|
||||
has_datasets = true;
|
||||
}
|
||||
}
|
||||
},
|
||||
Some("dashboard") => {
|
||||
has_dashboards = true;
|
||||
|
||||
// Dashboards often reference metrics too
|
||||
has_metrics = true;
|
||||
|
||||
// Check if the dashboard has dataset references via metrics
|
||||
if let Some(yml_content) = file.get("yml_content").and_then(|y| y.as_str()) {
|
||||
if yml_content.contains("dataset") || yml_content.contains("datasetIds") {
|
||||
has_datasets = true;
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
tracing::debug!("Unknown file type in import_assets: {:?}", file_type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set appropriate state values based on what we found
|
||||
if has_metrics {
|
||||
tracing::debug!("Setting metrics_available state to true");
|
||||
agent.set_state_value(String::from("metrics_available"), Value::Bool(true))
|
||||
.await;
|
||||
}
|
||||
if has_dashboards {
|
||||
tracing::debug!("Setting dashboards_available state to true");
|
||||
agent.set_state_value(String::from("dashboards_available"), Value::Bool(true))
|
||||
.await;
|
||||
}
|
||||
if has_datasets {
|
||||
tracing::debug!("Setting data_context state to true");
|
||||
agent.set_state_value(String::from("data_context"), Value::Bool(true))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -210,37 +210,51 @@ pub async fn post_chat_handler(
|
|||
for mut message in messages {
|
||||
message.chat_id = chat_id;
|
||||
|
||||
// If this is a file message, create file association
|
||||
if message.response_messages.is_array() {
|
||||
let response_arr = message.response_messages.as_array().unwrap();
|
||||
if !response_arr.is_empty() {
|
||||
if let Some(response) = response_arr.get(0) {
|
||||
if response.get("type").map_or(false, |t| t == "file") {
|
||||
// Extract version_number from response, default to 1 if not found
|
||||
let asset_version_number = response.get("versionNumber")
|
||||
.and_then(|v| v.as_i64())
|
||||
.map(|v| v as i32)
|
||||
.unwrap_or(1);
|
||||
|
||||
// Create association in database
|
||||
let _ = create_message_file_association(
|
||||
message.id,
|
||||
asset_id_value,
|
||||
asset_version_number,
|
||||
asset_type_value,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Insert message into database
|
||||
// Insert message into database first
|
||||
let mut conn = get_pg_pool().get().await?;
|
||||
insert_into(database::schema::messages::table)
|
||||
.values(&message)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
// After message is inserted, create file association if needed
|
||||
if message.response_messages.is_array() {
|
||||
let response_arr = message.response_messages.as_array().unwrap();
|
||||
|
||||
// Find a file response in the array
|
||||
for response in response_arr {
|
||||
if response.get("type").map_or(false, |t| t == "file") {
|
||||
// Extract version_number from response, default to 1 if not found
|
||||
let asset_version_number = response.get("version_number")
|
||||
.and_then(|v| v.as_i64())
|
||||
.map(|v| v as i32)
|
||||
.unwrap_or(1);
|
||||
|
||||
// Ensure the response id matches the asset_id
|
||||
let response_id = response.get("id")
|
||||
.and_then(|id| id.as_str())
|
||||
.and_then(|id_str| Uuid::parse_str(id_str).ok())
|
||||
.unwrap_or(asset_id_value);
|
||||
|
||||
// Verify the response ID matches the asset ID
|
||||
if response_id == asset_id_value {
|
||||
// Create association in database - now the message exists in DB
|
||||
if let Err(e) = create_message_file_association(
|
||||
message.id,
|
||||
asset_id_value,
|
||||
asset_version_number,
|
||||
asset_type_value,
|
||||
)
|
||||
.await {
|
||||
tracing::warn!("Failed to create message file association: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// We only need to process one file association
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add to updated messages for the response
|
||||
updated_messages.push(message);
|
||||
|
@ -264,8 +278,38 @@ pub async fn post_chat_handler(
|
|||
);
|
||||
|
||||
chat_with_messages.add_message(chat_message);
|
||||
|
||||
// We don't need to process the raw_llm_messages here
|
||||
// The ChatContextLoader.update_context_from_tool_calls function will handle the asset state
|
||||
// when the agent is initialized and loads the context
|
||||
}
|
||||
|
||||
// Explicitly update the chat in the database with most_recent_file information
|
||||
// to ensure it behaves like files generated in a chat
|
||||
let asset_type_string = match asset_type_value {
|
||||
AssetType::MetricFile => Some("metric".to_string()),
|
||||
AssetType::DashboardFile => Some("dashboard".to_string()),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if let Some(file_type) = asset_type_string {
|
||||
// Update the chat directly to ensure it has the most_recent_file information
|
||||
let mut conn = get_pg_pool().get().await?;
|
||||
diesel::update(chats::table.find(chat_id))
|
||||
.set((
|
||||
chats::most_recent_file_id.eq(Some(asset_id_value)),
|
||||
chats::most_recent_file_type.eq(Some(file_type.clone())),
|
||||
chats::updated_at.eq(Utc::now()),
|
||||
))
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
tracing::info!(
|
||||
"Updated chat {} with most_recent_file_id: {}, most_recent_file_type: {}",
|
||||
chat_id, asset_id_value, file_type
|
||||
);
|
||||
}
|
||||
|
||||
// Return early with auto-generated messages - no need for agent processing
|
||||
return Ok(chat_with_messages);
|
||||
}
|
||||
|
@ -486,7 +530,7 @@ pub async fn post_chat_handler(
|
|||
}),
|
||||
response_messages.clone(),
|
||||
reasoning_messages.clone(),
|
||||
Some(formatted_reasoning_duration.clone()), // Use the formatted duration string
|
||||
Some(formatted_reasoning_duration.clone()), // Use formatted reasoning duration for regular messages
|
||||
Utc::now(),
|
||||
);
|
||||
|
||||
|
@ -503,7 +547,7 @@ pub async fn post_chat_handler(
|
|||
deleted_at: None,
|
||||
response_messages: serde_json::to_value(&response_messages)?,
|
||||
reasoning: serde_json::to_value(&reasoning_messages)?,
|
||||
final_reasoning_message: Some(formatted_reasoning_duration), // Use the formatted duration string
|
||||
final_reasoning_message: Some(formatted_reasoning_duration), // Use formatted reasoning duration for regular messages
|
||||
title: title.title.clone().unwrap_or_default(),
|
||||
raw_llm_messages: serde_json::to_value(&raw_llm_messages)?,
|
||||
feedback: None,
|
||||
|
@ -797,6 +841,7 @@ pub struct BusterReasoningMessageContainer {
|
|||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct BusterChatResponseFileMetadata {
|
||||
pub status: String,
|
||||
pub message: String,
|
||||
|
@ -805,7 +850,6 @@ pub struct BusterChatResponseFileMetadata {
|
|||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum BusterChatMessage {
|
||||
Text {
|
||||
id: String,
|
||||
|
@ -1009,10 +1053,7 @@ pub async fn transform_message(
|
|||
filter_version_id: None,
|
||||
metadata: Some(vec![BusterChatResponseFileMetadata {
|
||||
status: "completed".to_string(),
|
||||
message: format!(
|
||||
"Created new {}",
|
||||
file_content.file_type
|
||||
),
|
||||
message: "Created by Buster".to_string(),
|
||||
timestamp: Some(Utc::now().timestamp()),
|
||||
}]),
|
||||
};
|
||||
|
@ -1095,10 +1136,7 @@ pub async fn transform_message(
|
|||
filter_version_id: None,
|
||||
metadata: Some(vec![BusterChatResponseFileMetadata {
|
||||
status: "completed".to_string(),
|
||||
message: format!(
|
||||
"Created new {}",
|
||||
file_content.file_type
|
||||
),
|
||||
message: "Created by Buster".to_string(),
|
||||
timestamp: Some(Utc::now().timestamp()),
|
||||
}]),
|
||||
};
|
||||
|
@ -1953,6 +1991,8 @@ Return only the title text with no additional formatting, explanation, quotes, n
|
|||
/// The function streams the title back as it's being generated.
|
||||
type BusterContainerResult = Result<(BusterContainer, ThreadEvent)>;
|
||||
|
||||
// The implementation has been moved to ChatContextLoader.update_context_from_tool_calls
|
||||
|
||||
pub async fn generate_conversation_title(
|
||||
messages: &[AgentMessage],
|
||||
message_id: &Uuid,
|
||||
|
@ -2050,7 +2090,7 @@ async fn initialize_chat(
|
|||
let (asset_id, asset_type) = normalize_asset_fields(request);
|
||||
if let (Some(asset_id), Some(asset_type)) = (asset_id, asset_type) {
|
||||
match fetch_asset_details(asset_id, asset_type).await {
|
||||
Ok(details) => format!("View {}", details.name),
|
||||
Ok(details) => details.name.clone(),
|
||||
Err(_) => "New Chat".to_string(),
|
||||
}
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue