mirror of https://github.com/buster-so/buster.git
load in assets and mark context
This commit is contained in:
parent
09c41be967
commit
44042cd676
|
@ -2,25 +2,30 @@ use anyhow::{anyhow, Result};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use database::{
|
use database::{
|
||||||
enums::AssetType,
|
enums::AssetType,
|
||||||
models::{Message, MessageToFile},
|
models::{DashboardFile, Message, MessageToFile, MetricFile},
|
||||||
pool::get_pg_pool,
|
pool::get_pg_pool,
|
||||||
schema::{chats, messages, messages_to_files},
|
schema::{chats, dashboard_files, messages, messages_to_files, metric_files},
|
||||||
};
|
};
|
||||||
use diesel::{insert_into, ExpressionMethods, QueryDsl};
|
use diesel::{insert_into, ExpressionMethods, QueryDsl};
|
||||||
use diesel_async::RunQueryDsl;
|
use diesel_async::RunQueryDsl;
|
||||||
|
// Using json for our serialization
|
||||||
use middleware::AuthenticatedUser;
|
use middleware::AuthenticatedUser;
|
||||||
|
use serde_json::json;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::context_loaders::fetch_asset_details;
|
use super::context_loaders::fetch_asset_details;
|
||||||
|
|
||||||
/// Generate default messages for prompt-less asset-based chats
|
/// Generate default messages for prompt-less asset-based chats
|
||||||
///
|
///
|
||||||
/// This function creates a pair of messages to be shown in a chat when a user
|
/// This function creates a message to be shown in a chat when a user
|
||||||
/// opens an asset without providing a prompt:
|
/// opens an asset without providing a prompt. It includes:
|
||||||
/// 1. A file message that represents the asset itself
|
/// 1. A file response that represents the asset itself
|
||||||
/// 2. A text message with placeholder content
|
/// 2. A text response with helpful context
|
||||||
///
|
///
|
||||||
/// The function also checks that the user has permission to view the asset
|
/// The function also adds the asset information to raw_llm_messages so
|
||||||
|
/// the agent can understand the context of the asset being viewed.
|
||||||
|
///
|
||||||
|
/// The function checks that the user has permission to view the asset
|
||||||
/// and fetches the asset details for display.
|
/// and fetches the asset details for display.
|
||||||
pub async fn generate_asset_messages(
|
pub async fn generate_asset_messages(
|
||||||
asset_id: Uuid,
|
asset_id: Uuid,
|
||||||
|
@ -38,6 +43,105 @@ pub async fn generate_asset_messages(
|
||||||
let message_id = Uuid::new_v4();
|
let message_id = Uuid::new_v4();
|
||||||
let timestamp = Utc::now().timestamp();
|
let timestamp = Utc::now().timestamp();
|
||||||
|
|
||||||
|
// Fetch detailed asset information
|
||||||
|
let mut conn = get_pg_pool().get().await?;
|
||||||
|
|
||||||
|
// Create the import_assets tool call sequence
|
||||||
|
let tool_call_id = format!("call_{}", Uuid::new_v4().simple().to_string());
|
||||||
|
|
||||||
|
// Prepare asset data based on asset type
|
||||||
|
let (asset_data, asset_type_str) = match asset_type {
|
||||||
|
AssetType::MetricFile => {
|
||||||
|
let metric = metric_files::table
|
||||||
|
.filter(metric_files::id.eq(asset_id))
|
||||||
|
.first::<MetricFile>(&mut conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Get YAML content
|
||||||
|
let yml_content = serde_yaml::to_string(&metric.content)?;
|
||||||
|
|
||||||
|
// For simplicity, we'll just use an empty array for results
|
||||||
|
// since MetricYml may not have results field
|
||||||
|
let results = serde_json::json!([]);
|
||||||
|
|
||||||
|
// Create asset data object
|
||||||
|
let asset_data = json!({
|
||||||
|
"id": asset_id.to_string(),
|
||||||
|
"name": metric.name,
|
||||||
|
"file_type": "metric",
|
||||||
|
"asset_type": "metric",
|
||||||
|
"yml_content": yml_content,
|
||||||
|
"result_message": "0 records were returned",
|
||||||
|
"results": results,
|
||||||
|
"created_at": metric.created_at,
|
||||||
|
"version_number": metric.version_history.get_version_number(),
|
||||||
|
"updated_at": metric.updated_at
|
||||||
|
});
|
||||||
|
|
||||||
|
(asset_data, "metric")
|
||||||
|
},
|
||||||
|
AssetType::DashboardFile => {
|
||||||
|
let dashboard = dashboard_files::table
|
||||||
|
.filter(dashboard_files::id.eq(asset_id))
|
||||||
|
.first::<DashboardFile>(&mut conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Get YAML content
|
||||||
|
let yml_content = serde_yaml::to_string(&dashboard.content)?;
|
||||||
|
|
||||||
|
// Create asset data object
|
||||||
|
let asset_data = json!({
|
||||||
|
"id": asset_id.to_string(),
|
||||||
|
"name": dashboard.name,
|
||||||
|
"file_type": "dashboard",
|
||||||
|
"asset_type": "dashboard",
|
||||||
|
"yml_content": yml_content,
|
||||||
|
"created_at": dashboard.created_at,
|
||||||
|
"version_number": dashboard.version_history.get_version_number(),
|
||||||
|
"updated_at": dashboard.updated_at
|
||||||
|
});
|
||||||
|
|
||||||
|
(asset_data, "dashboard")
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
return Err(anyhow!("Unsupported asset type for generating asset messages: {:?}", asset_type));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create the tool response content
|
||||||
|
let tool_response_content = json!({
|
||||||
|
"message": format!("Successfully imported 1 {} files.", asset_type_str),
|
||||||
|
"duration": 928, // Example duration
|
||||||
|
"files": [asset_data]
|
||||||
|
}).to_string();
|
||||||
|
|
||||||
|
// Create the Assistant message with tool call
|
||||||
|
let assistant_message = serde_json::json!({
|
||||||
|
"name": "buster_super_agent",
|
||||||
|
"role": "assistant",
|
||||||
|
"tool_calls": [
|
||||||
|
{
|
||||||
|
"id": tool_call_id,
|
||||||
|
"type": "function",
|
||||||
|
"function": {
|
||||||
|
"name": "import_assets",
|
||||||
|
"arguments": "{}"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create the Tool response message
|
||||||
|
let tool_message = serde_json::json!({
|
||||||
|
"name": "import_assets",
|
||||||
|
"role": "tool",
|
||||||
|
"content": tool_response_content,
|
||||||
|
"tool_call_id": tool_call_id
|
||||||
|
});
|
||||||
|
|
||||||
|
// Combine into raw_llm_messages
|
||||||
|
let raw_llm_messages = serde_json::json!([assistant_message, tool_message]);
|
||||||
|
|
||||||
let message = Message {
|
let message = Message {
|
||||||
id: message_id,
|
id: message_id,
|
||||||
request_message: None, // Empty request for auto-generated messages
|
request_message: None, // Empty request for auto-generated messages
|
||||||
|
@ -51,7 +155,7 @@ pub async fn generate_asset_messages(
|
||||||
"type": "text",
|
"type": "text",
|
||||||
"id": Uuid::new_v4().to_string(),
|
"id": Uuid::new_v4().to_string(),
|
||||||
"message": format!("{} has been pulled into a new chat.\n\nContinue chatting to modify or make changes to it.", asset_details.name),
|
"message": format!("{} has been pulled into a new chat.\n\nContinue chatting to modify or make changes to it.", asset_details.name),
|
||||||
"isFinalMessage": true
|
"is_final_message": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"type": "file",
|
"type": "file",
|
||||||
|
@ -72,7 +176,7 @@ pub async fn generate_asset_messages(
|
||||||
reasoning: serde_json::Value::Array(vec![]),
|
reasoning: serde_json::Value::Array(vec![]),
|
||||||
final_reasoning_message: Some("".to_string()),
|
final_reasoning_message: Some("".to_string()),
|
||||||
title: asset_details.name.clone(),
|
title: asset_details.name.clone(),
|
||||||
raw_llm_messages: serde_json::json!([]),
|
raw_llm_messages, // Add the agent context messages
|
||||||
feedback: None,
|
feedback: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ impl ChatContextLoader {
|
||||||
|
|
||||||
// Helper function to check for tool usage and set appropriate context
|
// Helper function to check for tool usage and set appropriate context
|
||||||
async fn update_context_from_tool_calls(agent: &Arc<Agent>, message: &AgentMessage) {
|
async fn update_context_from_tool_calls(agent: &Arc<Agent>, message: &AgentMessage) {
|
||||||
|
// Handle tool calls from assistant messages
|
||||||
if let AgentMessage::Assistant { tool_calls: Some(tool_calls), .. } = message {
|
if let AgentMessage::Assistant { tool_calls: Some(tool_calls), .. } = message {
|
||||||
for tool_call in tool_calls {
|
for tool_call in tool_calls {
|
||||||
match tool_call.function.name.as_str() {
|
match tool_call.function.name.as_str() {
|
||||||
|
@ -42,6 +43,10 @@ impl ChatContextLoader {
|
||||||
agent.set_state_value(String::from("dashboards_available"), Value::Bool(true))
|
agent.set_state_value(String::from("dashboards_available"), Value::Bool(true))
|
||||||
.await;
|
.await;
|
||||||
},
|
},
|
||||||
|
"import_assets" => {
|
||||||
|
// When we see import_assets, we need to check the content in the corresponding tool response
|
||||||
|
// This will be handled separately when processing tool messages
|
||||||
|
},
|
||||||
name if name.contains("file") || name.contains("read") || name.contains("write") || name.contains("edit") => {
|
name if name.contains("file") || name.contains("read") || name.contains("write") || name.contains("edit") => {
|
||||||
agent.set_state_value(String::from("files_available"), Value::Bool(true))
|
agent.set_state_value(String::from("files_available"), Value::Bool(true))
|
||||||
.await;
|
.await;
|
||||||
|
@ -50,6 +55,82 @@ impl ChatContextLoader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle tool responses - important for import_assets
|
||||||
|
if let AgentMessage::Tool { name: Some(tool_name), content, .. } = message {
|
||||||
|
if tool_name == "import_assets" {
|
||||||
|
// Parse the tool response to see what was imported
|
||||||
|
if let Ok(import_result) = serde_json::from_str::<serde_json::Value>(content) {
|
||||||
|
// Check for files array
|
||||||
|
if let Some(files) = import_result.get("files").and_then(|f| f.as_array()) {
|
||||||
|
if !files.is_empty() {
|
||||||
|
// Set files_available for any imported files
|
||||||
|
agent.set_state_value(String::from("files_available"), Value::Bool(true))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Check each file to determine its type
|
||||||
|
let mut has_metrics = false;
|
||||||
|
let mut has_dashboards = false;
|
||||||
|
let mut has_datasets = false;
|
||||||
|
|
||||||
|
for file in files {
|
||||||
|
// Check file_type/asset_type to determine what kind of asset this is
|
||||||
|
let file_type = file.get("file_type").and_then(|ft| ft.as_str())
|
||||||
|
.or_else(|| file.get("asset_type").and_then(|at| at.as_str()));
|
||||||
|
|
||||||
|
tracing::debug!("Processing imported file with type: {:?}", file_type);
|
||||||
|
|
||||||
|
match file_type {
|
||||||
|
Some("metric") => {
|
||||||
|
has_metrics = true;
|
||||||
|
|
||||||
|
// Check if the metric has dataset references
|
||||||
|
if let Some(yml_content) = file.get("yml_content").and_then(|y| y.as_str()) {
|
||||||
|
if yml_content.contains("dataset") || yml_content.contains("datasetIds") {
|
||||||
|
has_datasets = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Some("dashboard") => {
|
||||||
|
has_dashboards = true;
|
||||||
|
|
||||||
|
// Dashboards often reference metrics too
|
||||||
|
has_metrics = true;
|
||||||
|
|
||||||
|
// Check if the dashboard has dataset references via metrics
|
||||||
|
if let Some(yml_content) = file.get("yml_content").and_then(|y| y.as_str()) {
|
||||||
|
if yml_content.contains("dataset") || yml_content.contains("datasetIds") {
|
||||||
|
has_datasets = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
tracing::debug!("Unknown file type in import_assets: {:?}", file_type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set appropriate state values based on what we found
|
||||||
|
if has_metrics {
|
||||||
|
tracing::debug!("Setting metrics_available state to true");
|
||||||
|
agent.set_state_value(String::from("metrics_available"), Value::Bool(true))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
if has_dashboards {
|
||||||
|
tracing::debug!("Setting dashboards_available state to true");
|
||||||
|
agent.set_state_value(String::from("dashboards_available"), Value::Bool(true))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
if has_datasets {
|
||||||
|
tracing::debug!("Setting data_context state to true");
|
||||||
|
agent.set_state_value(String::from("data_context"), Value::Bool(true))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -276,6 +276,10 @@ pub async fn post_chat_handler(
|
||||||
);
|
);
|
||||||
|
|
||||||
chat_with_messages.add_message(chat_message);
|
chat_with_messages.add_message(chat_message);
|
||||||
|
|
||||||
|
// We don't need to process the raw_llm_messages here
|
||||||
|
// The ChatContextLoader.update_context_from_tool_calls function will handle the asset state
|
||||||
|
// when the agent is initialized and loads the context
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return early with auto-generated messages - no need for agent processing
|
// Return early with auto-generated messages - no need for agent processing
|
||||||
|
@ -498,7 +502,7 @@ pub async fn post_chat_handler(
|
||||||
}),
|
}),
|
||||||
response_messages.clone(),
|
response_messages.clone(),
|
||||||
reasoning_messages.clone(),
|
reasoning_messages.clone(),
|
||||||
Some("".to_string()), // Empty string as requested
|
Some(formatted_reasoning_duration.clone()), // Use formatted reasoning duration for regular messages
|
||||||
Utc::now(),
|
Utc::now(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -515,7 +519,7 @@ pub async fn post_chat_handler(
|
||||||
deleted_at: None,
|
deleted_at: None,
|
||||||
response_messages: serde_json::to_value(&response_messages)?,
|
response_messages: serde_json::to_value(&response_messages)?,
|
||||||
reasoning: serde_json::to_value(&reasoning_messages)?,
|
reasoning: serde_json::to_value(&reasoning_messages)?,
|
||||||
final_reasoning_message: Some("".to_string()), // Empty string as requested
|
final_reasoning_message: Some(formatted_reasoning_duration), // Use formatted reasoning duration for regular messages
|
||||||
title: title.title.clone().unwrap_or_default(),
|
title: title.title.clone().unwrap_or_default(),
|
||||||
raw_llm_messages: serde_json::to_value(&raw_llm_messages)?,
|
raw_llm_messages: serde_json::to_value(&raw_llm_messages)?,
|
||||||
feedback: None,
|
feedback: None,
|
||||||
|
@ -1023,7 +1027,7 @@ pub async fn transform_message(
|
||||||
filter_version_id: None,
|
filter_version_id: None,
|
||||||
metadata: Some(vec![BusterChatResponseFileMetadata {
|
metadata: Some(vec![BusterChatResponseFileMetadata {
|
||||||
status: "completed".to_string(),
|
status: "completed".to_string(),
|
||||||
message: "Pulled into new chat".to_string(),
|
message: "Created by Buster".to_string(),
|
||||||
timestamp: Some(Utc::now().timestamp()),
|
timestamp: Some(Utc::now().timestamp()),
|
||||||
}]),
|
}]),
|
||||||
};
|
};
|
||||||
|
@ -1106,7 +1110,7 @@ pub async fn transform_message(
|
||||||
filter_version_id: None,
|
filter_version_id: None,
|
||||||
metadata: Some(vec![BusterChatResponseFileMetadata {
|
metadata: Some(vec![BusterChatResponseFileMetadata {
|
||||||
status: "completed".to_string(),
|
status: "completed".to_string(),
|
||||||
message: "Pulled into new chat".to_string(),
|
message: "Created by Buster".to_string(),
|
||||||
timestamp: Some(Utc::now().timestamp()),
|
timestamp: Some(Utc::now().timestamp()),
|
||||||
}]),
|
}]),
|
||||||
};
|
};
|
||||||
|
@ -1961,6 +1965,8 @@ Return only the title text with no additional formatting, explanation, quotes, n
|
||||||
/// The function streams the title back as it's being generated.
|
/// The function streams the title back as it's being generated.
|
||||||
type BusterContainerResult = Result<(BusterContainer, ThreadEvent)>;
|
type BusterContainerResult = Result<(BusterContainer, ThreadEvent)>;
|
||||||
|
|
||||||
|
// The implementation has been moved to ChatContextLoader.update_context_from_tool_calls
|
||||||
|
|
||||||
pub async fn generate_conversation_title(
|
pub async fn generate_conversation_title(
|
||||||
messages: &[AgentMessage],
|
messages: &[AgentMessage],
|
||||||
message_id: &Uuid,
|
message_id: &Uuid,
|
||||||
|
|
Loading…
Reference in New Issue