organization handler, metric day js spec, and modify streaming

This commit is contained in:
dal 2025-04-17 10:15:31 -06:00
parent e0f65589dd
commit 126047ab94
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
6 changed files with 172 additions and 56 deletions

View File

@ -312,7 +312,7 @@ definitions:
description: Currency code for currency formatting (e.g., USD, EUR)
dateFormat:
type: string
description: Format string for date display (must be compatible with Day.js format strings). Should be set based on user request and context.
description: Format string for date display (must be compatible with Day.js format strings).
useRelativeTime:
type: boolean
description: Whether to display dates as relative time (e.g., 2 days ago)

View File

@ -2087,69 +2087,71 @@ fn transform_assistant_tool_message(
// Determine file type based on tool name
let file_type = if tool_name.contains("metric") { "metric" } else { "dashboard" };
// Process the chunk using the appropriate parser method
let parse_result = if file_type == "metric" {
parser.process_metric_chunk(tool_id.clone(), &tool_call.function.arguments)
} else {
parser.process_dashboard_chunk(tool_id.clone(), &tool_call.function.arguments)
};
// --- START: Process InProgress Chunks ---
if progress == MessageProgress::InProgress {
// Process the chunk using the appropriate parser method
let parse_result = if file_type == "metric" {
parser.process_metric_chunk(tool_id.clone(), &tool_call.function.arguments)
} else {
parser.process_dashboard_chunk(tool_id.clone(), &tool_call.function.arguments)
};
// If parser returns a reasoning message (File type expected)
if let Ok(Some(BusterReasoningMessage::File(mut file_reasoning))) = parse_result {
// Added missing variable initializations
let mut has_updates = false;
let mut updated_files_map = std::collections::HashMap::new();
// If parser returns a reasoning message (File type expected)
if let Ok(Some(BusterReasoningMessage::File(mut file_reasoning))) = parse_result {
// Added missing variable initializations
let mut has_updates = false;
let mut updated_files_map = std::collections::HashMap::new();
// Iterate through the files parsed so far
for (file_map_id, mut file_detail) in file_reasoning.files.iter_mut() { // Use iter_mut
// Ensure text_chunk has content
if let Some(yml_chunk) = &file_detail.file.text_chunk {
// Define unique chunk ID for this file
let chunk_id = format!("{}_{}", tool_id, file_detail.id);
// Calculate delta using the tracker
let delta = tracker.add_chunk(chunk_id.clone(), yml_chunk.clone());
// Iterate through the files parsed so far
for (file_map_id, mut file_detail) in file_reasoning.files.iter_mut() { // Use iter_mut
// Ensure text_chunk has content
if let Some(yml_chunk) = &file_detail.file.text_chunk {
// Define unique chunk ID for this file
let chunk_id = format!("{}_{}", tool_id, file_detail.id);
// Calculate delta using the tracker
let delta = tracker.add_chunk(chunk_id.clone(), yml_chunk.clone());
if !delta.is_empty() { // Now delta is defined
// Update file detail with delta
file_detail.file.text_chunk = Some(delta);
file_detail.file.text = None; // Ensure full text is cleared when chunking
file_detail.status = "In Progress".to_string(); // Set status to in progress
has_updates = true;
updated_files_map.insert(file_map_id.clone(), file_detail.clone()); // Clone file_detail
} else {
// If delta is empty, it means this chunk is identical to the last seen content
// We might still want to include it in the update map if its status needs setting,
// but primarily we track changes.
// Consider if we need to update status even without content change.
// For now, we only add to updated_files_map if there's a delta.
if !delta.is_empty() { // Now delta is defined
// Update file detail with delta
file_detail.file.text_chunk = Some(delta);
file_detail.file.text = None; // Ensure full text is cleared when chunking
file_detail.status = "In Progress".to_string(); // Set status to in progress
has_updates = true;
updated_files_map.insert(file_map_id.clone(), file_detail.clone()); // Clone file_detail
} else {
// If delta is empty, it means this chunk is identical to the last seen content
// We might still want to include it in the update map if its status needs setting,
// but primarily we track changes.
// Consider if we need to update status even without content change.
// For now, we only add to updated_files_map if there's a delta.
}
}
}
}
// Update only the files that had changes
if has_updates {
file_reasoning.files = updated_files_map; // Replace with updated files
// Update only the files that had changes
if has_updates {
file_reasoning.files = updated_files_map; // Replace with updated files
all_results.push(ToolTransformResult::Reasoning(
BusterReasoningMessage::File(file_reasoning),
));
}
} else if let Ok(Some(BusterReasoningMessage::Text(text_reasoning))) = parse_result {
all_results.push(ToolTransformResult::Reasoning(
BusterReasoningMessage::File(file_reasoning),
));
BusterReasoningMessage::Text(text_reasoning),
));
}
} else if let Ok(Some(BusterReasoningMessage::Text(text_reasoning))) = parse_result {
all_results.push(ToolTransformResult::Reasoning(
BusterReasoningMessage::Text(text_reasoning),
));
}
// --- END: Process InProgress Chunks ---
// Handle complete progress for file tools if needed
if progress == MessageProgress::Complete {
// Removed finalize_file_processing call as it caused a linter error
// if let Ok(parsed_content) = serde_json::from_str::<Value>(&tool_call.function.arguments) {
// // Optionally use parsed_content for final processing if needed
// if let Ok(Some(reasoning_messages)) = parser.finalize_file_processing(tool_id.clone(), file_type, parsed_content, last_reasoning_completion_time.elapsed()) {
// all_results.extend(reasoning_messages.into_iter().map(ToolTransformResult::Reasoning));
// }
// }
// The actual tool result processing (parsing ModifyFilesOutput/Create...Output)
// happens in `transform_tool_message`. Here, we just need to clean up the tracker.
tracker.clear_chunk(tool_id.clone()); // Clear tracker for the main tool ID
// Consider clearing file-specific chunk IDs too if necessary
// parser.clear_related_chunks(tool_id.clone()); // Example hypothetical parser method
// Clear any potential file-specific chunks managed by the tracker
// Note: The current tracker implementation doesn't explicitly track sub-chunks,
// but this is where you would add logic if it did.
// For example: parser.clear_related_chunks(tool_id.clone(), tracker);
}
}
"no_search_needed" | "review_plan" => {

View File

@ -1,4 +1,6 @@
pub mod types;
pub mod update_organization_handler;
pub mod post_organization_handler;
pub use update_organization_handler::*;
pub use update_organization_handler::*;
pub use post_organization_handler::*;

View File

@ -0,0 +1,78 @@
use anyhow::{Context, Result};
use chrono::Utc;
use database::{
enums::{SharingSetting, UserOrganizationRole, UserOrganizationStatus},
models::{Organization, UserToOrganization},
pool::get_pg_pool,
schema::{organizations, users_to_organizations},
};
use diesel::insert_into;
use diesel_async::RunQueryDsl;
use middleware::AuthenticatedUser;
use uuid::Uuid;
/// Creates a new organization and adds the creating user as a WorkspaceAdmin.
pub async fn post_organization_handler(
name: String,
user: AuthenticatedUser,
) -> Result<()> {
let pool = get_pg_pool();
let mut conn = pool
.get()
.await
.context("Failed to get database connection")?;
let now = Utc::now();
let new_org_id = Uuid::new_v4();
// Create the new organization
let new_organization = Organization {
id: new_org_id,
name: name.clone(),
domain: None, // Domain can be updated later if needed
created_at: now,
updated_at: now,
deleted_at: None,
};
insert_into(organizations::table)
.values(&new_organization)
.execute(&mut conn)
.await
.context("Failed to insert new organization")?;
// Add the user to the organization as WorkspaceAdmin
let user_to_org = UserToOrganization {
user_id: user.id,
organization_id: new_org_id,
role: UserOrganizationRole::WorkspaceAdmin,
// Set sensible defaults for a new organization admin
sharing_setting: SharingSetting::Team, // Default setting
edit_sql: true,
upload_csv: true,
export_assets: true,
email_slack_enabled: true, // Default setting
created_at: now,
updated_at: now,
deleted_at: None,
created_by: user.id,
updated_by: user.id,
deleted_by: None,
status: UserOrganizationStatus::Active,
};
insert_into(users_to_organizations::table)
.values(&user_to_org)
.execute(&mut conn)
.await
.context("Failed to add user to new organization")?;
tracing::info!(
"Created organization {} ({}) and added user {} as admin.",
name,
new_org_id,
user.id
);
Ok(())
}

View File

@ -1,10 +1,15 @@
use axum::{routing::{get, put}, Router};
use axum::{
routing::{get, post, put},
Router,
};
mod users;
pub mod post_organization;
mod update_organization;
mod users;
pub fn router() -> Router {
Router::new()
.route("/:id/users", get(users::list_organization_users))
.route("/:id", put(update_organization::update_organization))
}
.route("/", post(post_organization::post_organization))
}

View File

@ -0,0 +1,29 @@
use axum::{http::StatusCode, Json, Extension};
use handlers::organizations::post_organization_handler;
use middleware::AuthenticatedUser;
use serde::{Deserialize, Serialize};
use crate::routes::rest::ApiResponse;
#[derive(Deserialize, Serialize)]
pub struct PostOrganizationRequest {
name: String,
}
/// REST endpoint to create a new organization.
pub async fn post_organization(
Extension(user): Extension<AuthenticatedUser>,
Json(payload): Json<PostOrganizationRequest>,
) -> Result<ApiResponse<()>, (StatusCode, &'static str)> {
post_organization_handler(payload.name, user)
.await
.map_err(|e| {
tracing::error!("Failed to create organization: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to create organization",
)
})?;
Ok(ApiResponse::NoContent)
}