mirror of https://github.com/buster-so/buster.git
moved around the ids so that the metrics and dashobard align across the board
This commit is contained in:
parent
97656868d3
commit
b839e70aa9
|
@ -17,7 +17,8 @@ serde_json = { version = "1.0.117", features = ["preserve_order"] }
|
|||
serde_yaml = "0.9.34"
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
tracing = "0.1.40"
|
||||
uuid = { version = "1.8", features = ["serde", "v4"] }
|
||||
uuid = { version = "1.8", features = ["serde", "v4", "v5"] }
|
||||
sha2 = "0.10.8"
|
||||
diesel = { version = "2", features = [
|
||||
"uuid",
|
||||
"chrono",
|
||||
|
|
|
@ -519,7 +519,7 @@ impl Agent {
|
|||
for tool_call in tool_calls {
|
||||
if let Some(tool) = self.tools.read().await.get(&tool_call.function.name) {
|
||||
let params: Value = serde_json::from_str(&tool_call.function.arguments)?;
|
||||
let result = tool.execute(params).await?;
|
||||
let result = tool.execute(params, tool_call.id.clone()).await?;
|
||||
let result_str = serde_json::to_string(&result)?;
|
||||
let tool_message = AgentMessage::tool(
|
||||
None,
|
||||
|
@ -701,7 +701,7 @@ mod tests {
|
|||
type Output = Value;
|
||||
type Params = Value;
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output> {
|
||||
self.send_progress(
|
||||
"Fetching weather data...".to_string(),
|
||||
"123".to_string(),
|
||||
|
|
|
@ -552,6 +552,7 @@ required:
|
|||
/// The string is a message about the number of records returned by the SQL query
|
||||
/// The vector of IndexMap<String, DataType> is the results of the SQL query. Returns empty vector if more than 13 records or no results.
|
||||
pub async fn process_metric_file(
|
||||
tool_call_id: String,
|
||||
file_name: String,
|
||||
yml_content: String,
|
||||
) -> Result<
|
||||
|
@ -568,9 +569,7 @@ pub async fn process_metric_file(
|
|||
let metric_yml =
|
||||
MetricYml::new(yml_content.clone()).map_err(|e| format!("Invalid YAML format: {}", e))?;
|
||||
|
||||
let metric_id = metric_yml
|
||||
.id
|
||||
.ok_or_else(|| "Missing required field 'id'".to_string())?;
|
||||
let metric_id = generate_deterministic_uuid(&tool_call_id, &file_name, "metric").unwrap();
|
||||
|
||||
// Check if dataset_ids is empty
|
||||
if metric_yml.dataset_ids.is_empty() {
|
||||
|
@ -828,6 +827,22 @@ pub async fn process_metric_file_modification(
|
|||
}
|
||||
}
|
||||
|
||||
/// Generates a deterministic UUID based on tool call ID, file name, and file type
|
||||
pub fn generate_deterministic_uuid(
|
||||
tool_call_id: &str,
|
||||
file_name: &str,
|
||||
file_type: &str,
|
||||
) -> Result<Uuid> {
|
||||
// Use a fixed namespace for the application
|
||||
let namespace_uuid = Uuid::NAMESPACE_OID;
|
||||
|
||||
// Combine inputs to create a unique name
|
||||
let name = format!("{}:{}:{}", tool_call_id, file_name, file_type);
|
||||
|
||||
// Generate v5 UUID (SHA1 based)
|
||||
Ok(Uuid::new_v5(&namespace_uuid, name.as_bytes()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
@ -18,7 +18,7 @@ use crate::{
|
|||
};
|
||||
|
||||
use super::{
|
||||
common::validate_metric_ids,
|
||||
common::{generate_deterministic_uuid, validate_metric_ids},
|
||||
file_types::{
|
||||
dashboard_yml::DashboardYml,
|
||||
file::{FileEnum, FileWithId},
|
||||
|
@ -67,6 +67,7 @@ impl FileModificationTool for CreateDashboardFilesTool {}
|
|||
/// Process a dashboard file creation request
|
||||
/// Returns Ok((DashboardFile, DashboardYml)) if successful, or an error message if failed
|
||||
async fn process_dashboard_file(
|
||||
tool_call_id: String,
|
||||
file: DashboardFileParams,
|
||||
) -> Result<(DashboardFile, DashboardYml), String> {
|
||||
debug!("Processing dashboard file creation: {}", file.name);
|
||||
|
@ -74,9 +75,7 @@ async fn process_dashboard_file(
|
|||
let dashboard_yml = DashboardYml::new(file.yml_content.clone())
|
||||
.map_err(|e| format!("Invalid YAML format: {}", e))?;
|
||||
|
||||
let dashboard_id = dashboard_yml
|
||||
.id
|
||||
.ok_or_else(|| "Missing required field 'id'".to_string())?;
|
||||
let dashboard_id = generate_deterministic_uuid(&tool_call_id, &file.name, "dashboard").unwrap();
|
||||
|
||||
// Collect and validate metric IDs from rows
|
||||
let metric_ids: Vec<Uuid> = dashboard_yml
|
||||
|
@ -134,7 +133,7 @@ impl ToolExecutor for CreateDashboardFilesTool {
|
|||
}
|
||||
}
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let files = params.files;
|
||||
|
@ -148,7 +147,7 @@ impl ToolExecutor for CreateDashboardFilesTool {
|
|||
|
||||
// First pass - validate and prepare all records
|
||||
for file in files {
|
||||
match process_dashboard_file(file.clone()).await {
|
||||
match process_dashboard_file(tool_call_id.clone(), file.clone()).await {
|
||||
Ok((dashboard_file, dashboard_yml)) => {
|
||||
dashboard_records.push(dashboard_file);
|
||||
dashboard_ymls.push(dashboard_yml);
|
||||
|
|
|
@ -24,7 +24,7 @@ use crate::{
|
|||
},
|
||||
};
|
||||
|
||||
use super::{common::validate_sql, FileModificationTool};
|
||||
use super::{common::{validate_sql, generate_deterministic_uuid}, FileModificationTool};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct MetricFileParams {
|
||||
|
@ -81,7 +81,7 @@ impl ToolExecutor for CreateMetricFilesTool {
|
|||
}
|
||||
}
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let files = params.files;
|
||||
|
@ -94,7 +94,7 @@ impl ToolExecutor for CreateMetricFilesTool {
|
|||
let mut results_vec = vec![];
|
||||
// First pass - validate and prepare all records
|
||||
for file in files {
|
||||
match process_metric_file(file.name.clone(), file.yml_content.clone()).await {
|
||||
match process_metric_file(tool_call_id.clone(), file.name.clone(), file.yml_content.clone()).await {
|
||||
Ok((metric_file, metric_yml, message, results)) => {
|
||||
metric_records.push(metric_file);
|
||||
metric_ymls.push(metric_yml);
|
||||
|
|
|
@ -5,7 +5,9 @@ use uuid::Uuid;
|
|||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct MetricYml {
|
||||
#[serde(skip_serializing)]
|
||||
pub id: Option<Uuid>,
|
||||
#[serde(skip_serializing)]
|
||||
pub updated_at: Option<DateTime<Utc>>,
|
||||
#[serde(alias = "name")]
|
||||
pub title: String,
|
||||
|
@ -283,17 +285,11 @@ pub struct TableChartConfig {
|
|||
|
||||
impl MetricYml {
|
||||
pub fn new(yml_content: String) -> Result<Self> {
|
||||
let mut file: MetricYml = match serde_yaml::from_str(&yml_content) {
|
||||
let file: MetricYml = match serde_yaml::from_str(&yml_content) {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Err(anyhow::anyhow!("Error parsing YAML: {}", e)),
|
||||
};
|
||||
|
||||
if file.id.is_none() {
|
||||
file.id = Some(Uuid::new_v4());
|
||||
}
|
||||
|
||||
file.updated_at = Some(Utc::now());
|
||||
|
||||
match file.validate() {
|
||||
Ok(_) => Ok(file),
|
||||
Err(e) => Err(anyhow::anyhow!("Error compiling file: {}", e)),
|
||||
|
@ -356,8 +352,8 @@ mod tests {
|
|||
assert_eq!(metadata[1].data_type, "number");
|
||||
|
||||
// Verify auto-generated fields
|
||||
assert!(metric.id.is_some());
|
||||
assert!(metric.updated_at.is_some());
|
||||
assert!(metric.id.is_none());
|
||||
assert!(metric.updated_at.is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -251,7 +251,7 @@ impl ToolExecutor for ModifyDashboardFilesTool {
|
|||
}
|
||||
}
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
debug!("Starting file modification execution");
|
||||
|
|
|
@ -262,7 +262,7 @@ impl ToolExecutor for ModifyMetricFilesTool {
|
|||
}
|
||||
}
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
debug!("Starting file modification execution");
|
||||
|
|
|
@ -260,7 +260,7 @@ impl ToolExecutor for SearchDataCatalogTool {
|
|||
type Output = SearchDataCatalogOutput;
|
||||
type Params = SearchDataCatalogParams;
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
// Fetch all non-deleted datasets
|
||||
|
|
|
@ -35,7 +35,7 @@ impl ToolExecutor for CreatePlan {
|
|||
"create_plan".to_string()
|
||||
}
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output> {
|
||||
self.agent
|
||||
.set_state_value(String::from("plan_available"), Value::Bool(true))
|
||||
.await;
|
||||
|
|
|
@ -12,8 +12,8 @@ pub trait ToolExecutor: Send + Sync {
|
|||
/// The type of the parameters for this tool
|
||||
type Params: DeserializeOwned + Send;
|
||||
|
||||
/// Execute the tool with the given parameters.
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output>;
|
||||
/// Execute the tool with the given parameters and tool call ID.
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output>;
|
||||
|
||||
/// Get the JSON schema for this tool
|
||||
fn get_schema(&self) -> Value;
|
||||
|
@ -53,9 +53,9 @@ where
|
|||
type Output = Value;
|
||||
type Params = Value;
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output> {
|
||||
let params = serde_json::from_value(params)?;
|
||||
let result = self.inner.execute(params).await?;
|
||||
let result = self.inner.execute(params, tool_call_id).await?;
|
||||
Ok(serde_json::to_value(result)?)
|
||||
}
|
||||
|
||||
|
@ -78,8 +78,8 @@ impl<T: ToolExecutor<Output = Value, Params = Value> + Send + Sync> ToolExecutor
|
|||
type Output = Value;
|
||||
type Params = Value;
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
(**self).execute(params).await
|
||||
async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result<Self::Output> {
|
||||
(**self).execute(params, tool_call_id).await
|
||||
}
|
||||
|
||||
fn get_schema(&self) -> Value {
|
||||
|
|
|
@ -22,6 +22,7 @@ indexmap = { workspace = true }
|
|||
async-trait = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
|
||||
# Local dependencies
|
||||
database = { path = "../database" }
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use anyhow::Result;
|
||||
use serde_json::Value;
|
||||
use uuid::Uuid;
|
||||
use sha2::{Sha256, Digest};
|
||||
use agents::tools::categories::file_tools::common::generate_deterministic_uuid;
|
||||
|
||||
use super::post_chat_handler::{
|
||||
BusterReasoningFile, BusterReasoningMessage, BusterReasoningPill,
|
||||
|
@ -126,6 +128,7 @@ impl StreamingParser {
|
|||
|
||||
// Internal function to process file data (shared by metric and dashboard processing)
|
||||
pub fn process_file_data(&mut self, id: String, file_type: String) -> Result<Option<BusterReasoningMessage>> {
|
||||
|
||||
// Extract and replace yml_content with placeholders
|
||||
let mut yml_contents = Vec::new();
|
||||
let mut positions = Vec::new();
|
||||
|
@ -260,13 +263,13 @@ impl StreamingParser {
|
|||
.and_then(Value::as_str)
|
||||
.unwrap_or("");
|
||||
|
||||
// Generate a consistent UUID based on the file name
|
||||
let file_id = Uuid::new_v4().to_string();
|
||||
// Generate deterministic UUID based on tool call ID, file name, and type
|
||||
let file_id = generate_deterministic_uuid(&id, name, &file_type)?;
|
||||
|
||||
let buster_file = BusterFile {
|
||||
id: file_id.clone(),
|
||||
id: file_id.to_string(),
|
||||
file_type: file_type.clone(),
|
||||
file_name: name.clone().to_string(),
|
||||
file_name: name.to_string(),
|
||||
version_number: 1,
|
||||
version_id: Uuid::new_v4().to_string(),
|
||||
status: "loading".to_string(),
|
||||
|
@ -278,8 +281,8 @@ impl StreamingParser {
|
|||
metadata: None,
|
||||
};
|
||||
|
||||
file_ids.push(name.clone().to_string());
|
||||
files_map.insert(name.clone().to_string(), buster_file);
|
||||
file_ids.push(file_id.to_string());
|
||||
files_map.insert(file_id.to_string(), buster_file);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue