mirror of https://github.com/buster-so/buster.git
things are working real nice
This commit is contained in:
parent
4182d83339
commit
d11996bbe0
|
@ -169,13 +169,14 @@ impl Message {
|
|||
tool_calls: Option<Vec<ToolCall>>,
|
||||
progress: Option<MessageProgress>,
|
||||
initial: Option<bool>,
|
||||
name: Option<String>,
|
||||
) -> Self {
|
||||
let initial = initial.unwrap_or(false);
|
||||
|
||||
Self::Assistant {
|
||||
id,
|
||||
content,
|
||||
name: None,
|
||||
name,
|
||||
tool_calls,
|
||||
progress,
|
||||
initial,
|
||||
|
|
|
@ -106,10 +106,17 @@ async fn process_chat(request: ChatCreateNewChat, user: User) -> Result<ThreadWi
|
|||
match message {
|
||||
Ok(msg) => {
|
||||
match msg {
|
||||
AgentMessage::Assistant { content: Some(content), tool_calls: None, .. } => {
|
||||
// Store the final message and break immediately
|
||||
final_message = Some(content);
|
||||
break;
|
||||
AgentMessage::Assistant {
|
||||
name: Some(name),
|
||||
content: Some(content),
|
||||
tool_calls: None,
|
||||
..
|
||||
} => {
|
||||
if name == "manager_agent" {
|
||||
// Store the final message and break immediately
|
||||
final_message = Some(content);
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => messages.push(msg),
|
||||
}
|
||||
|
@ -137,14 +144,7 @@ async fn process_chat(request: ChatCreateNewChat, user: User) -> Result<ThreadWi
|
|||
.await?;
|
||||
|
||||
// Store final message state and process any completed files
|
||||
store_final_message_state(
|
||||
&mut conn,
|
||||
&message,
|
||||
&messages,
|
||||
&user_org_id,
|
||||
&user.id,
|
||||
)
|
||||
.await?;
|
||||
store_final_message_state(&mut conn, &message, &messages, &user_org_id, &user.id).await?;
|
||||
|
||||
// Update thread_with_messages with processed messages
|
||||
if let Some(thread_message) = thread_with_messages.messages.first_mut() {
|
||||
|
|
|
@ -114,6 +114,8 @@ pub struct Agent {
|
|||
user_id: Uuid,
|
||||
/// The session ID for the current thread
|
||||
session_id: Uuid,
|
||||
/// Agent name
|
||||
name: String,
|
||||
/// Shutdown signal sender
|
||||
shutdown_tx: Arc<RwLock<broadcast::Sender<()>>>,
|
||||
}
|
||||
|
@ -125,6 +127,7 @@ impl Agent {
|
|||
tools: HashMap<String, Box<dyn ToolExecutor<Output = Value, Params = Value> + Send + Sync>>,
|
||||
user_id: Uuid,
|
||||
session_id: Uuid,
|
||||
name: String,
|
||||
) -> Self {
|
||||
let llm_api_key = env::var("LLM_API_KEY").expect("LLM_API_KEY must be set");
|
||||
let llm_base_url = env::var("LLM_BASE_URL").expect("LLM_API_BASE must be set");
|
||||
|
@ -146,11 +149,12 @@ impl Agent {
|
|||
user_id,
|
||||
session_id,
|
||||
shutdown_tx: Arc::new(RwLock::new(shutdown_tx)),
|
||||
name,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new Agent that shares state and stream with an existing agent
|
||||
pub fn from_existing(existing_agent: &Agent) -> Self {
|
||||
pub fn from_existing(existing_agent: &Agent, name: String) -> Self {
|
||||
let llm_api_key = env::var("LLM_API_KEY").expect("LLM_API_KEY must be set");
|
||||
let llm_base_url = env::var("LLM_BASE_URL").expect("LLM_API_BASE must be set");
|
||||
|
||||
|
@ -166,6 +170,7 @@ impl Agent {
|
|||
user_id: existing_agent.user_id,
|
||||
session_id: existing_agent.session_id,
|
||||
shutdown_tx: Arc::clone(&existing_agent.shutdown_tx),
|
||||
name,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -328,7 +333,7 @@ impl Agent {
|
|||
let err_msg = format!("Error processing thread: {:?}", e);
|
||||
let _ = agent_clone.get_stream_sender().await.send(Err(AgentError(err_msg)));
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = shutdown_rx.recv() => {
|
||||
let _ = agent_clone.get_stream_sender().await.send(
|
||||
Ok(Message::assistant(
|
||||
|
@ -337,6 +342,7 @@ impl Agent {
|
|||
None,
|
||||
None,
|
||||
None,
|
||||
Some(agent_clone.name.clone()),
|
||||
))
|
||||
);
|
||||
}
|
||||
|
@ -364,6 +370,7 @@ impl Agent {
|
|||
None,
|
||||
None,
|
||||
None,
|
||||
Some(self.name.clone()),
|
||||
);
|
||||
self.get_stream_sender().await.send(Ok(message))?;
|
||||
return Ok(());
|
||||
|
@ -401,7 +408,7 @@ impl Agent {
|
|||
content,
|
||||
tool_calls,
|
||||
..
|
||||
} => Message::assistant(None, content.clone(), tool_calls.clone(), None, None),
|
||||
} => Message::assistant(None, content.clone(), tool_calls.clone(), None, None, Some(self.name.clone())),
|
||||
_ => return Err(anyhow::anyhow!("Expected assistant message from LLM")),
|
||||
};
|
||||
|
||||
|
@ -675,6 +682,7 @@ mod tests {
|
|||
HashMap::new(),
|
||||
Uuid::new_v4(),
|
||||
Uuid::new_v4(),
|
||||
"test_agent".to_string(),
|
||||
);
|
||||
|
||||
let thread = AgentThread::new(
|
||||
|
@ -701,6 +709,7 @@ mod tests {
|
|||
HashMap::new(),
|
||||
Uuid::new_v4(),
|
||||
Uuid::new_v4(),
|
||||
"test_agent".to_string(),
|
||||
);
|
||||
|
||||
// Create weather tool with reference to agent
|
||||
|
@ -735,6 +744,7 @@ mod tests {
|
|||
HashMap::new(),
|
||||
Uuid::new_v4(),
|
||||
Uuid::new_v4(),
|
||||
"test_agent".to_string(),
|
||||
);
|
||||
|
||||
let weather_tool = WeatherTool::new(Arc::new(agent.clone()));
|
||||
|
@ -767,6 +777,7 @@ mod tests {
|
|||
HashMap::new(),
|
||||
Uuid::new_v4(),
|
||||
Uuid::new_v4(),
|
||||
"test_agent".to_string(),
|
||||
);
|
||||
|
||||
// Test setting single values
|
||||
|
|
|
@ -7,7 +7,6 @@ use uuid::Uuid;
|
|||
use crate::utils::{
|
||||
agent::{agent::AgentError, Agent, AgentExt, AgentThread},
|
||||
tools::{
|
||||
agents_as_tools::dashboard_agent_tool::DashboardAgentOutput,
|
||||
file_tools::{
|
||||
CreateDashboardFilesTool, CreateMetricFilesTool, FilterDashboardFilesTool, ModifyDashboardFilesTool, ModifyMetricFilesTool
|
||||
},
|
||||
|
@ -73,6 +72,7 @@ impl DashboardAgent {
|
|||
HashMap::new(),
|
||||
user_id,
|
||||
session_id,
|
||||
"dashboard_agent".to_string(),
|
||||
));
|
||||
|
||||
let dashboard = Self { agent };
|
||||
|
@ -82,7 +82,7 @@ impl DashboardAgent {
|
|||
|
||||
pub async fn from_existing(existing_agent: &Arc<Agent>) -> Result<Self> {
|
||||
// Create a new agent with the same core properties and shared state/stream
|
||||
let agent = Arc::new(Agent::from_existing(existing_agent));
|
||||
let agent = Arc::new(Agent::from_existing(existing_agent, "dashboard_agent".to_string()));
|
||||
let dashboard = Self { agent };
|
||||
dashboard.load_tools().await?;
|
||||
Ok(dashboard)
|
||||
|
|
|
@ -29,6 +29,7 @@ impl ExploratoryAgent {
|
|||
HashMap::new(),
|
||||
user_id,
|
||||
session_id,
|
||||
"exploratory_agent".to_string(),
|
||||
));
|
||||
|
||||
let exploratory = Self { agent };
|
||||
|
@ -38,7 +39,7 @@ impl ExploratoryAgent {
|
|||
|
||||
pub async fn from_existing(existing_agent: &Arc<Agent>) -> Result<Self> {
|
||||
// Create a new agent with the same core properties and shared state/stream
|
||||
let agent = Arc::new(Agent::from_existing(existing_agent));
|
||||
let agent = Arc::new(Agent::from_existing(existing_agent, "exploratory_agent".to_string()));
|
||||
let exploratory = Self { agent };
|
||||
exploratory.load_tools().await?;
|
||||
Ok(exploratory)
|
||||
|
|
|
@ -7,6 +7,7 @@ use uuid::Uuid;
|
|||
|
||||
use crate::utils::tools::agents_as_tools::{DashboardAgentTool, MetricAgentTool};
|
||||
use crate::utils::tools::file_tools::SendAssetsToUserTool;
|
||||
use crate::utils::tools::planning_tools::{CreatePlan, ReviewPlan};
|
||||
use crate::utils::{
|
||||
agent::{agent::AgentError, Agent, AgentExt, AgentThread},
|
||||
tools::{
|
||||
|
@ -43,12 +44,15 @@ impl AgentExt for ManagerAgent {
|
|||
}
|
||||
|
||||
impl ManagerAgent {
|
||||
async fn load_tools(&self, include_send_assets: bool) -> Result<()> {
|
||||
async fn load_tools(&self) -> Result<()> {
|
||||
// Create tools using the shared Arc
|
||||
let search_data_catalog_tool = SearchDataCatalogTool::new(Arc::clone(&self.agent));
|
||||
let search_files_tool = SearchFilesTool::new(Arc::clone(&self.agent));
|
||||
let create_or_modify_metrics_tool = MetricAgentTool::new(Arc::clone(&self.agent));
|
||||
let create_or_modify_dashboards_tool = DashboardAgentTool::new(Arc::clone(&self.agent));
|
||||
let create_plan_tool = CreatePlan::new(Arc::clone(&self.agent));
|
||||
let review_plan_tool = ReviewPlan::new(Arc::clone(&self.agent));
|
||||
let send_assets_to_user = SendAssetsToUserTool::new(Arc::clone(&self.agent));
|
||||
|
||||
// Add tools to the agent
|
||||
self.agent
|
||||
|
@ -75,16 +79,25 @@ impl ManagerAgent {
|
|||
create_or_modify_dashboards_tool.into_value_tool(),
|
||||
)
|
||||
.await;
|
||||
|
||||
if include_send_assets {
|
||||
let send_assets_to_user = SendAssetsToUserTool::new(Arc::clone(&self.agent));
|
||||
self.agent
|
||||
.add_tool(
|
||||
send_assets_to_user.get_name(),
|
||||
send_assets_to_user.into_value_tool(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
self.agent
|
||||
.add_tool(
|
||||
send_assets_to_user.get_name(),
|
||||
send_assets_to_user.into_value_tool(),
|
||||
)
|
||||
.await;
|
||||
self.agent
|
||||
.add_tool(
|
||||
create_plan_tool.get_name(),
|
||||
create_plan_tool.into_value_tool(),
|
||||
)
|
||||
.await;
|
||||
self.agent
|
||||
.add_tool(
|
||||
review_plan_tool.get_name(),
|
||||
review_plan_tool.into_value_tool(),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -96,18 +109,19 @@ impl ManagerAgent {
|
|||
HashMap::new(),
|
||||
user_id,
|
||||
session_id,
|
||||
"manager_agent".to_string(),
|
||||
));
|
||||
|
||||
let manager = Self { agent };
|
||||
manager.load_tools(false).await?;
|
||||
manager.load_tools().await?;
|
||||
Ok(manager)
|
||||
}
|
||||
|
||||
pub async fn from_existing(existing_agent: &Arc<Agent>) -> Result<Self> {
|
||||
// Create a new agent with the same core properties and shared state/stream
|
||||
let agent = Arc::new(Agent::from_existing(existing_agent));
|
||||
let agent = Arc::new(Agent::from_existing(existing_agent, "manager_agent".to_string()));
|
||||
let manager = Self { agent };
|
||||
manager.load_tools(true).await?;
|
||||
manager.load_tools().await?;
|
||||
Ok(manager)
|
||||
}
|
||||
|
||||
|
@ -132,7 +146,8 @@ impl ManagerAgent {
|
|||
const MANAGER_AGENT_PROMPT: &str = r##"
|
||||
### Role & Task
|
||||
You are an expert analytics and data engineer who helps non-technical users get fast, accurate answers to their analytics questions. Your name is Buster.
|
||||
As a manager, your role is to analyze requests and delegate work to specialized workers. Take immediate action using available tools and workers - do not explain what you plan to do first.
|
||||
|
||||
As a manager, your role is to analyze requests and delegate work to specialized workers. Take immediate action using available tools and workers.
|
||||
|
||||
### Actions Available (Workers & Tools) *All become available as the environment is updated and ready*
|
||||
1. **search_data_catalog**
|
||||
|
@ -140,36 +155,43 @@ As a manager, your role is to analyze requests and delegate work to specialized
|
|||
- Must be used first if you need context about available data
|
||||
- Skip if you already have sufficient context
|
||||
|
||||
2. **metric_worker**
|
||||
2. **create_plan**
|
||||
- Use to clearly state the objective and outline your workflow
|
||||
- Must be used before performing any kind of analysis or workflow
|
||||
- Build a concise, step-by-step plan that references the other available actions listed below
|
||||
|
||||
3. **metric_worker**
|
||||
- Delegate metric creation/updates to this specialized worker
|
||||
- For single visualizations or small sets of related charts
|
||||
- The worker handles SQL writing and visualization configuration
|
||||
- Use this for most visualization requests unless a full dashboard is needed
|
||||
|
||||
3. **dashboard_worker**
|
||||
4. **dashboard_worker**
|
||||
- Only use when multiple metrics need to be organized into a cohesive dashboard view
|
||||
- For creating new dashboards or updating existing ones with multiple related visualizations
|
||||
- Use metric_worker instead if only creating/updating individual charts
|
||||
- The worker handles SQL and visualization configuration
|
||||
|
||||
4. **search_files**
|
||||
- Only use when user explicitly asks to search through files
|
||||
- For finding previously created content
|
||||
|
||||
5. **send_assets_to_user**
|
||||
- Use after workers complete their metric/dashboard tasks
|
||||
- Specifies which assets to show the user
|
||||
- Skip if no assets were created/modified
|
||||
|
||||
6. **search_files**
|
||||
- Only use when user explicitly asks to search through files
|
||||
- For finding previously created content
|
||||
|
||||
### Response Guidelines and Format
|
||||
- When you've accomplished the task that the user requested, respond with a clear and concise message about how you did it.
|
||||
- Do not include yml in your response.
|
||||
|
||||
### Key Guidelines
|
||||
- Take immediate action - do not explain your plan first
|
||||
- If you're going to take action, begin immediately. Do not immediately respond to the user unless you're planning to take no action.
|
||||
- Search data catalog first unless you have context
|
||||
- Don't ask clarifying questions - make reasonable assumptions
|
||||
- Only respond after completing the requested tasks
|
||||
- Supported charts: tables, line, bar, histogram, pie/donut, metric cards, scatter plots
|
||||
- Under the hood, you use state of the art encryption and have rigirous security protocols and policies in place.
|
||||
- Currently, you are not able to do things that require Python. You are only capable of querying historical data using SQL statements.
|
||||
- Keep final responses clear and concise, focusing on what was accomplished
|
||||
"##;
|
||||
|
|
|
@ -49,6 +49,7 @@ impl MetricAgent {
|
|||
HashMap::new(),
|
||||
user_id,
|
||||
session_id,
|
||||
"metric_agent".to_string(),
|
||||
));
|
||||
|
||||
let metric = Self { agent };
|
||||
|
@ -58,7 +59,7 @@ impl MetricAgent {
|
|||
|
||||
pub async fn from_existing(existing_agent: &Arc<Agent>) -> Result<Self> {
|
||||
// Create a new agent with the same core properties and shared state/stream
|
||||
let agent = Arc::new(Agent::from_existing(existing_agent));
|
||||
let agent = Arc::new(Agent::from_existing(existing_agent, "metric_agent".to_string()));
|
||||
let metric = Self { agent };
|
||||
metric.load_tools().await?;
|
||||
Ok(metric)
|
||||
|
|
|
@ -42,9 +42,12 @@ impl ToolExecutor for DashboardAgentTool {
|
|||
}
|
||||
|
||||
async fn is_enabled(&self) -> bool {
|
||||
match self.agent.get_state_value("data_context").await {
|
||||
Some(_) => true,
|
||||
None => false,
|
||||
match (
|
||||
self.agent.get_state_value("data_context").await,
|
||||
self.agent.get_state_value("plan_available").await,
|
||||
) {
|
||||
(Some(_), Some(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -91,7 +94,7 @@ impl ToolExecutor for DashboardAgentTool {
|
|||
fn get_schema(&self) -> Value {
|
||||
serde_json::json!({
|
||||
"name": self.get_name(),
|
||||
"description": "Specifically designed for creating or updating dashboards that require multiple related metrics. This tool excels at understanding complex dashboard requirements and automatically determining which metrics to create and how to organize them. Only use when you need to work with multiple metrics that should be displayed together on a single dashboard page.",
|
||||
"description": "Executes the previously built plan for dashboard creation or updates based on the ticket description. This tool processes the plan and coordinates the creation of all necessary metrics and dashboard components according to the analyzed requirements.",
|
||||
"strict": true,
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
|
@ -101,7 +104,7 @@ impl ToolExecutor for DashboardAgentTool {
|
|||
"properties": {
|
||||
"ticket_description": {
|
||||
"type": "string",
|
||||
"description": "The high-level requirements for what needs to be monitored or analyzed via the dashboard. Focus on describing the business or technical needs (e.g., 'We need to monitor our application's overall health including memory usage, CPU, and error rates', or 'Create a sales overview dashboard that shows our daily revenue, top products, and regional performance'). The dashboard worker will determine the specific metrics needed and their optimal arrangement."
|
||||
"description": "A high-level description of what the dashboard should accomplish, including the metrics to be displayed and their organization. For example: 'Create a sales performance dashboard with monthly revenue trends, top-selling products, and regional breakdown' or 'Build a user engagement dashboard showing daily active users, session duration, and feature usage statistics'. The specific implementation details and SQL queries will be handled by the dashboard worker."
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
|
@ -122,16 +125,19 @@ async fn process_agent_output(
|
|||
println!("Agent message: {:?}", msg);
|
||||
match msg {
|
||||
AgentMessage::Assistant {
|
||||
name: Some(name),
|
||||
content: Some(content),
|
||||
tool_calls: None,
|
||||
..
|
||||
} => {
|
||||
// Return the collected output with the final message
|
||||
return Ok(DashboardAgentOutput {
|
||||
message: content,
|
||||
duration: start_time.elapsed().as_secs() as i64,
|
||||
files,
|
||||
});
|
||||
if name == "dashboard_agent" {
|
||||
// Return the collected output with the final message
|
||||
return Ok(DashboardAgentOutput {
|
||||
message: content,
|
||||
duration: start_time.elapsed().as_secs() as i64,
|
||||
files,
|
||||
});
|
||||
}
|
||||
}
|
||||
AgentMessage::Tool { content, .. } => {
|
||||
// Process tool output
|
||||
|
|
|
@ -43,9 +43,12 @@ impl ToolExecutor for MetricAgentTool {
|
|||
}
|
||||
|
||||
async fn is_enabled(&self) -> bool {
|
||||
match self.agent.get_state_value("data_context").await {
|
||||
Some(_) => true,
|
||||
None => false,
|
||||
match (
|
||||
self.agent.get_state_value("data_context").await,
|
||||
self.agent.get_state_value("plan_available").await,
|
||||
) {
|
||||
(Some(_), Some(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -84,7 +87,7 @@ impl ToolExecutor for MetricAgentTool {
|
|||
fn get_schema(&self) -> Value {
|
||||
serde_json::json!({
|
||||
"name": self.get_name(),
|
||||
"description": "Use to create or update individual metrics or visualizations. This tool is most effective for direct, single-metric requests or when a user explicitly asks for a specific number of metrics (e.g., '2 metrics showing...' or '3 charts for...'). It is not suitable for ambiguous or complex multi-part requests.",
|
||||
"description": "Use to create or update individual metrics or visualizations based on the established plan. This tool executes the metric-related portions of the plan, focusing on one metric at a time. It should be used after data exploration and planning are complete, not for initial data discovery or ambiguous requests.",
|
||||
"strict": true,
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
|
@ -94,7 +97,7 @@ impl ToolExecutor for MetricAgentTool {
|
|||
"properties": {
|
||||
"ticket_description": {
|
||||
"type": "string",
|
||||
"description": "A brief description containing the general requirements for the metric(s). Focus on what needs to be measured or visualized. Write it as a command, e.g., 'Create a bar chart showing...', 'Generate 2 metrics that measure...', 'Add a metric for...', etc."
|
||||
"description": "A high-level description of the metric or visualization that needs to be created. This should describe what needs to be measured or visualized without including specific SQL statements. For example: 'Show monthly revenue from subscription payments' or 'Display daily active users count with a breakdown by user type'. The actual SQL construction will be handled by the metric agent."
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
|
@ -115,16 +118,19 @@ async fn process_agent_output(
|
|||
println!("Agent message: {:?}", msg);
|
||||
match msg {
|
||||
AgentMessage::Assistant {
|
||||
name: Some(name),
|
||||
content: Some(content),
|
||||
tool_calls: None,
|
||||
..
|
||||
} => {
|
||||
// Return the collected output with the final message
|
||||
return Ok(MetricAgentOutput {
|
||||
message: content,
|
||||
duration: start_time.elapsed().as_secs() as i64,
|
||||
files,
|
||||
});
|
||||
if name == "metric_agent" {
|
||||
// Return the collected output with the final message
|
||||
return Ok(MetricAgentOutput {
|
||||
message: content,
|
||||
duration: start_time.elapsed().as_secs() as i64,
|
||||
files,
|
||||
});
|
||||
}
|
||||
}
|
||||
AgentMessage::Tool { content, .. } => {
|
||||
// Process tool output
|
||||
|
|
|
@ -1,83 +0,0 @@
|
|||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::utils::{agent::Agent, tools::ToolExecutor};
|
||||
use litellm::ToolCall;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct PlanOutput {
|
||||
status: String,
|
||||
error: Option<String>,
|
||||
plan_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct PlanInput {
|
||||
markdown_content: String,
|
||||
title: String,
|
||||
}
|
||||
|
||||
pub struct CreatePlan {
|
||||
agent: Arc<Agent>,
|
||||
}
|
||||
|
||||
impl CreatePlan {
|
||||
pub fn new(agent: Arc<Agent>) -> Self {
|
||||
Self { agent }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ToolExecutor for CreatePlan {
|
||||
type Output = PlanOutput;
|
||||
type Params = PlanInput;
|
||||
|
||||
fn get_name(&self) -> String {
|
||||
"create_plan".to_string()
|
||||
}
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
let input = params;
|
||||
|
||||
// TODO: Implement actual plan creation logic here
|
||||
// This would typically involve:
|
||||
// 1. Validating the markdown content
|
||||
// 2. Storing the plan in the database with current_thread.user_id
|
||||
// 3. Returning the plan ID or error
|
||||
|
||||
Ok(PlanOutput {
|
||||
status: "success".to_string(),
|
||||
error: None,
|
||||
plan_id: Some("placeholder-id".to_string()),
|
||||
})
|
||||
}
|
||||
|
||||
async fn is_enabled(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn get_schema(&self) -> Value {
|
||||
serde_json::json!({
|
||||
"name": "create_plan",
|
||||
"description": "Creates a new plan from markdown content. The plan will be stored and can be referenced later.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"markdown_content": {
|
||||
"type": "string",
|
||||
"description": "The plan content in markdown format"
|
||||
},
|
||||
"title": {
|
||||
"type": "string",
|
||||
"description": "The title of the plan"
|
||||
}
|
||||
},
|
||||
"required": ["markdown_content", "title"]
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,7 +1,3 @@
|
|||
pub mod run_sql;
|
||||
pub mod create_plan;
|
||||
pub mod review_plan;
|
||||
|
||||
pub use run_sql::*;
|
||||
pub use create_plan::*;
|
||||
pub use review_plan::*;
|
||||
|
|
|
@ -21,7 +21,7 @@ use crate::{
|
|||
use super::{
|
||||
common::validate_sql,
|
||||
file_types::{
|
||||
file::{FileEnum, FileWithId},
|
||||
file::{FileWithId},
|
||||
metric_yml::MetricYml,
|
||||
},
|
||||
FileModificationTool,
|
||||
|
|
|
@ -864,6 +864,7 @@ mod tests {
|
|||
HashMap::new(),
|
||||
Uuid::new_v4(),
|
||||
Uuid::new_v4(),
|
||||
"test_agent".to_string(),
|
||||
)),
|
||||
};
|
||||
|
||||
|
|
|
@ -795,6 +795,7 @@ mod tests {
|
|||
HashMap::new(),
|
||||
Uuid::new_v4(),
|
||||
Uuid::new_v4(),
|
||||
"test_agent".to_string(),
|
||||
)),
|
||||
};
|
||||
|
||||
|
|
|
@ -849,6 +849,7 @@ mod tests {
|
|||
HashMap::new(),
|
||||
Uuid::new_v4(),
|
||||
Uuid::new_v4(),
|
||||
"test_agent".to_string(),
|
||||
)),
|
||||
};
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ use crate::{
|
|||
utils::{agent::Agent, tools::ToolExecutor},
|
||||
};
|
||||
|
||||
use litellm::{ChatCompletionRequest, LiteLLMClient, Message, Metadata, ResponseFormat, ToolCall};
|
||||
use litellm::{ChatCompletionRequest, LiteLLMClient, Message, Metadata, ResponseFormat};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct SearchDataCatalogParams {
|
||||
|
|
|
@ -46,9 +46,12 @@ impl ToolExecutor for SendAssetsToUserTool {
|
|||
}
|
||||
|
||||
async fn is_enabled(&self) -> bool {
|
||||
match self.agent.get_state_value("files_available").await {
|
||||
Some(_) => true,
|
||||
None => false,
|
||||
match (
|
||||
self.agent.get_state_value("files_available").await,
|
||||
self.agent.get_state_value("plan_available").await,
|
||||
) {
|
||||
(Some(_), Some(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,16 +1,13 @@
|
|||
use anyhow::{Result, anyhow};
|
||||
use axum::async_trait;
|
||||
use litellm::{Message, ToolCall};
|
||||
use serde::{Deserialize, Serialize, de::DeserializeOwned};
|
||||
use anyhow::{anyhow, Result};
|
||||
use litellm::ToolCall;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use serde_json::Value;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::utils::agent::Agent;
|
||||
|
||||
pub mod agents_as_tools;
|
||||
pub mod data_tools;
|
||||
pub mod file_tools;
|
||||
pub mod interaction_tools;
|
||||
pub mod planning_tools;
|
||||
|
||||
/// A trait that defines how tools should be implemented.
|
||||
/// Any struct that wants to be used as a tool must implement this trait.
|
||||
|
@ -19,7 +16,7 @@ pub mod interaction_tools;
|
|||
pub trait ToolExecutor: Send + Sync {
|
||||
/// The type of the output of the tool
|
||||
type Output: Serialize + Send;
|
||||
|
||||
|
||||
/// The type of the parameters for this tool
|
||||
type Params: DeserializeOwned + Send;
|
||||
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::utils::{agent::Agent, tools::ToolExecutor};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct CreatePlanOutput {
|
||||
message: String,
|
||||
plan: String,
|
||||
summary: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct CreatePlanInput {
|
||||
markdown_content: String,
|
||||
summary: String,
|
||||
}
|
||||
|
||||
pub struct CreatePlan {
|
||||
agent: Arc<Agent>,
|
||||
}
|
||||
|
||||
impl CreatePlan {
|
||||
pub fn new(agent: Arc<Agent>) -> Self {
|
||||
Self { agent }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ToolExecutor for CreatePlan {
|
||||
type Output = CreatePlanOutput;
|
||||
type Params = CreatePlanInput;
|
||||
|
||||
fn get_name(&self) -> String {
|
||||
"create_plan".to_string()
|
||||
}
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
self.agent
|
||||
.set_state_value(String::from("plan_available"), Value::Bool(true))
|
||||
.await;
|
||||
|
||||
Ok(CreatePlanOutput {
|
||||
message: "Plan created successfully".to_string(),
|
||||
plan: params.markdown_content,
|
||||
summary: params.summary,
|
||||
})
|
||||
}
|
||||
|
||||
async fn is_enabled(&self) -> bool {
|
||||
match self.agent.get_state_value("data_context").await {
|
||||
Some(_) => true,
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_schema(&self) -> Value {
|
||||
serde_json::json!({
|
||||
"name": self.get_name(),
|
||||
"description": "Creates a structured plan for responding to user requests. Use this tool when you have sufficient context about the user's needs and want to outline a clear approach. The plan should include specific, actionable steps and validation criteria.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"markdown_content": {
|
||||
"type": "string",
|
||||
"description": PLAN_TEMPLATE
|
||||
},
|
||||
"summary": {
|
||||
"type": "string",
|
||||
"description": "A brief summary of the plan's key points and objectives"
|
||||
}
|
||||
},
|
||||
"required": ["markdown_content", "summary"]
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const PLAN_TEMPLATE: &str = r##"
|
||||
# Plan
|
||||
|
||||
## Overview
|
||||
[Provide a brief summary of what needs to be accomplished and why]
|
||||
|
||||
## Context Requirements
|
||||
- [ ] Sufficient data context is available
|
||||
- [ ] User requirements are clear
|
||||
- [ ] Necessary tools are accessible
|
||||
|
||||
## Tasks
|
||||
1. [Task Name]
|
||||
- Description: [Detailed explanation of what needs to be done]
|
||||
- Tools to Use: [List relevant tools, e.g., create_metric, create_dashboard]
|
||||
- Visualization Type: [Specify chart type: table, line, bar, histogram, pie/donut, metric card, or scatter plot]
|
||||
- The goal is to make the data as digestible as possible for the user.
|
||||
- Validation Criteria:
|
||||
* [Specific, measurable criteria to confirm task completion]
|
||||
|
||||
2. [Additional Tasks as needed...]
|
||||
- Follow the same structure as above
|
||||
- Each task should be concrete and actionable
|
||||
|
||||
## Metrics Selection
|
||||
- Metrics to Create:
|
||||
* [List each visualization/metric with its purpose]
|
||||
- Response Strategy:
|
||||
* [If multiple metrics/dashboards are created, specify which ones to highlight in the response]
|
||||
* [Include rationale for metric selection]
|
||||
|
||||
## Review and Validation
|
||||
1. Quality Check
|
||||
- [ ] All tasks completed according to validation criteria
|
||||
- [ ] Visualizations are properly configured
|
||||
- [ ] Dashboards are functional and informative
|
||||
- [ ] Data is effectively communicated through chosen chart types
|
||||
|
||||
2. User Requirements Check
|
||||
- [ ] All user requirements have been addressed
|
||||
- [ ] Solution matches the original request
|
||||
- [ ] Documentation is clear and complete
|
||||
|
||||
## Notes
|
||||
[Any additional information or considerations]
|
||||
"##;
|
|
@ -0,0 +1,5 @@
|
|||
pub mod create_plan;
|
||||
pub mod review_plan;
|
||||
|
||||
pub use create_plan::*;
|
||||
pub use review_plan::*;
|
|
@ -48,7 +48,10 @@ impl ToolExecutor for ReviewPlan {
|
|||
}
|
||||
|
||||
async fn is_enabled(&self) -> bool {
|
||||
true
|
||||
match self.agent.get_state_value("plan_available").await {
|
||||
Some(_) => true,
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute(&self, params: Self::Params) -> Result<Self::Output> {
|
||||
|
@ -74,8 +77,8 @@ impl ToolExecutor for ReviewPlan {
|
|||
|
||||
fn get_schema(&self) -> Value {
|
||||
serde_json::json!({
|
||||
"name": "review_plan",
|
||||
"description": "Reviews an existing plan, optionally updates its content, and/or marks it as completed. Used for checking work and finalizing data analysis.",
|
||||
"name": self.get_name(),
|
||||
"description": "Reviews and validates changes made to data systems (metrics, dashboards, etc.) as part of a plan execution. Only use this tool when the plan involved data modifications. Skip for cosmetic changes like visualization adjustments.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
@ -83,20 +86,16 @@ impl ToolExecutor for ReviewPlan {
|
|||
"type": "string",
|
||||
"description": "The ID of the plan to review"
|
||||
},
|
||||
"mark_completed": {
|
||||
"feedback": {
|
||||
"type": "string",
|
||||
"description": "Detailed feedback about any tasks that weren't fully accomplished or need adjustments. Include specific issues found during validation of metrics, dashboards, or other data modifications."
|
||||
},
|
||||
"completed": {
|
||||
"type": "boolean",
|
||||
"description": "Whether to mark the plan as completed"
|
||||
},
|
||||
"updated_markdown_content": {
|
||||
"type": "string",
|
||||
"description": "Optional updated markdown content for the plan"
|
||||
},
|
||||
"review_comments": {
|
||||
"type": "string",
|
||||
"description": "Optional comments about the review or changes made"
|
||||
"description": "Whether all data modifications have been properly validated and the plan can be marked as complete. Set to false if any metrics or dashboards need further adjustments."
|
||||
}
|
||||
},
|
||||
"required": ["plan_id", "mark_completed"]
|
||||
"required": ["plan_id", "completed"]
|
||||
}
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue