From 126047ab942d9cbf6dc5d1f497c8284d9af27b94 Mon Sep 17 00:00:00 2001 From: dal Date: Thu, 17 Apr 2025 10:15:31 -0600 Subject: [PATCH] organization handler, metric day js spec, and modify streaming --- .../src/tools/categories/file_tools/common.rs | 2 +- .../handlers/src/chats/post_chat_handler.rs | 104 +++++++++--------- api/libs/handlers/src/organizations/mod.rs | 4 +- .../post_organization_handler.rs | 78 +++++++++++++ .../routes/rest/routes/organizations/mod.rs | 11 +- .../routes/organizations/post_organization.rs | 29 +++++ 6 files changed, 172 insertions(+), 56 deletions(-) create mode 100644 api/libs/handlers/src/organizations/post_organization_handler.rs create mode 100644 api/server/src/routes/rest/routes/organizations/post_organization.rs diff --git a/api/libs/agents/src/tools/categories/file_tools/common.rs b/api/libs/agents/src/tools/categories/file_tools/common.rs index 1e2453a83..6f0da2863 100644 --- a/api/libs/agents/src/tools/categories/file_tools/common.rs +++ b/api/libs/agents/src/tools/categories/file_tools/common.rs @@ -312,7 +312,7 @@ definitions: description: Currency code for currency formatting (e.g., USD, EUR) dateFormat: type: string - description: Format string for date display (must be compatible with Day.js format strings). Should be set based on user request and context. + description: Format string for date display (must be compatible with Day.js format strings). useRelativeTime: type: boolean description: Whether to display dates as relative time (e.g., 2 days ago) diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 59fcbe055..58d118286 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -2087,69 +2087,71 @@ fn transform_assistant_tool_message( // Determine file type based on tool name let file_type = if tool_name.contains("metric") { "metric" } else { "dashboard" }; - // Process the chunk using the appropriate parser method - let parse_result = if file_type == "metric" { - parser.process_metric_chunk(tool_id.clone(), &tool_call.function.arguments) - } else { - parser.process_dashboard_chunk(tool_id.clone(), &tool_call.function.arguments) - }; + // --- START: Process InProgress Chunks --- + if progress == MessageProgress::InProgress { + // Process the chunk using the appropriate parser method + let parse_result = if file_type == "metric" { + parser.process_metric_chunk(tool_id.clone(), &tool_call.function.arguments) + } else { + parser.process_dashboard_chunk(tool_id.clone(), &tool_call.function.arguments) + }; - // If parser returns a reasoning message (File type expected) - if let Ok(Some(BusterReasoningMessage::File(mut file_reasoning))) = parse_result { - // Added missing variable initializations - let mut has_updates = false; - let mut updated_files_map = std::collections::HashMap::new(); + // If parser returns a reasoning message (File type expected) + if let Ok(Some(BusterReasoningMessage::File(mut file_reasoning))) = parse_result { + // Added missing variable initializations + let mut has_updates = false; + let mut updated_files_map = std::collections::HashMap::new(); - // Iterate through the files parsed so far - for (file_map_id, mut file_detail) in file_reasoning.files.iter_mut() { // Use iter_mut - // Ensure text_chunk has content - if let Some(yml_chunk) = &file_detail.file.text_chunk { - // Define unique chunk ID for this file - let chunk_id = format!("{}_{}", tool_id, file_detail.id); - // Calculate delta using the tracker - let delta = tracker.add_chunk(chunk_id.clone(), yml_chunk.clone()); + // Iterate through the files parsed so far + for (file_map_id, mut file_detail) in file_reasoning.files.iter_mut() { // Use iter_mut + // Ensure text_chunk has content + if let Some(yml_chunk) = &file_detail.file.text_chunk { + // Define unique chunk ID for this file + let chunk_id = format!("{}_{}", tool_id, file_detail.id); + // Calculate delta using the tracker + let delta = tracker.add_chunk(chunk_id.clone(), yml_chunk.clone()); - if !delta.is_empty() { // Now delta is defined - // Update file detail with delta - file_detail.file.text_chunk = Some(delta); - file_detail.file.text = None; // Ensure full text is cleared when chunking - file_detail.status = "In Progress".to_string(); // Set status to in progress - has_updates = true; - updated_files_map.insert(file_map_id.clone(), file_detail.clone()); // Clone file_detail - } else { - // If delta is empty, it means this chunk is identical to the last seen content - // We might still want to include it in the update map if its status needs setting, - // but primarily we track changes. - // Consider if we need to update status even without content change. - // For now, we only add to updated_files_map if there's a delta. + if !delta.is_empty() { // Now delta is defined + // Update file detail with delta + file_detail.file.text_chunk = Some(delta); + file_detail.file.text = None; // Ensure full text is cleared when chunking + file_detail.status = "In Progress".to_string(); // Set status to in progress + has_updates = true; + updated_files_map.insert(file_map_id.clone(), file_detail.clone()); // Clone file_detail + } else { + // If delta is empty, it means this chunk is identical to the last seen content + // We might still want to include it in the update map if its status needs setting, + // but primarily we track changes. + // Consider if we need to update status even without content change. + // For now, we only add to updated_files_map if there's a delta. + } } } - } - // Update only the files that had changes - if has_updates { - file_reasoning.files = updated_files_map; // Replace with updated files + // Update only the files that had changes + if has_updates { + file_reasoning.files = updated_files_map; // Replace with updated files + all_results.push(ToolTransformResult::Reasoning( + BusterReasoningMessage::File(file_reasoning), + )); + } + } else if let Ok(Some(BusterReasoningMessage::Text(text_reasoning))) = parse_result { all_results.push(ToolTransformResult::Reasoning( - BusterReasoningMessage::File(file_reasoning), - )); + BusterReasoningMessage::Text(text_reasoning), + )); } - } else if let Ok(Some(BusterReasoningMessage::Text(text_reasoning))) = parse_result { - all_results.push(ToolTransformResult::Reasoning( - BusterReasoningMessage::Text(text_reasoning), - )); } + // --- END: Process InProgress Chunks --- + // Handle complete progress for file tools if needed if progress == MessageProgress::Complete { - // Removed finalize_file_processing call as it caused a linter error - // if let Ok(parsed_content) = serde_json::from_str::(&tool_call.function.arguments) { - // // Optionally use parsed_content for final processing if needed - // if let Ok(Some(reasoning_messages)) = parser.finalize_file_processing(tool_id.clone(), file_type, parsed_content, last_reasoning_completion_time.elapsed()) { - // all_results.extend(reasoning_messages.into_iter().map(ToolTransformResult::Reasoning)); - // } - // } + // The actual tool result processing (parsing ModifyFilesOutput/Create...Output) + // happens in `transform_tool_message`. Here, we just need to clean up the tracker. tracker.clear_chunk(tool_id.clone()); // Clear tracker for the main tool ID - // Consider clearing file-specific chunk IDs too if necessary - // parser.clear_related_chunks(tool_id.clone()); // Example hypothetical parser method + // Clear any potential file-specific chunks managed by the tracker + // Note: The current tracker implementation doesn't explicitly track sub-chunks, + // but this is where you would add logic if it did. + // For example: parser.clear_related_chunks(tool_id.clone(), tracker); } } "no_search_needed" | "review_plan" => { diff --git a/api/libs/handlers/src/organizations/mod.rs b/api/libs/handlers/src/organizations/mod.rs index a445f8ff7..79fbc1a30 100644 --- a/api/libs/handlers/src/organizations/mod.rs +++ b/api/libs/handlers/src/organizations/mod.rs @@ -1,4 +1,6 @@ pub mod types; pub mod update_organization_handler; +pub mod post_organization_handler; -pub use update_organization_handler::*; \ No newline at end of file +pub use update_organization_handler::*; +pub use post_organization_handler::*; \ No newline at end of file diff --git a/api/libs/handlers/src/organizations/post_organization_handler.rs b/api/libs/handlers/src/organizations/post_organization_handler.rs new file mode 100644 index 000000000..40be25a0f --- /dev/null +++ b/api/libs/handlers/src/organizations/post_organization_handler.rs @@ -0,0 +1,78 @@ +use anyhow::{Context, Result}; +use chrono::Utc; +use database::{ + enums::{SharingSetting, UserOrganizationRole, UserOrganizationStatus}, + models::{Organization, UserToOrganization}, + pool::get_pg_pool, + schema::{organizations, users_to_organizations}, +}; +use diesel::insert_into; +use diesel_async::RunQueryDsl; +use middleware::AuthenticatedUser; +use uuid::Uuid; + +/// Creates a new organization and adds the creating user as a WorkspaceAdmin. +pub async fn post_organization_handler( + name: String, + user: AuthenticatedUser, +) -> Result<()> { + let pool = get_pg_pool(); + let mut conn = pool + .get() + .await + .context("Failed to get database connection")?; + + let now = Utc::now(); + let new_org_id = Uuid::new_v4(); + + // Create the new organization + let new_organization = Organization { + id: new_org_id, + name: name.clone(), + domain: None, // Domain can be updated later if needed + created_at: now, + updated_at: now, + deleted_at: None, + }; + + insert_into(organizations::table) + .values(&new_organization) + .execute(&mut conn) + .await + .context("Failed to insert new organization")?; + + // Add the user to the organization as WorkspaceAdmin + let user_to_org = UserToOrganization { + user_id: user.id, + organization_id: new_org_id, + role: UserOrganizationRole::WorkspaceAdmin, + // Set sensible defaults for a new organization admin + sharing_setting: SharingSetting::Team, // Default setting + edit_sql: true, + upload_csv: true, + export_assets: true, + email_slack_enabled: true, // Default setting + created_at: now, + updated_at: now, + deleted_at: None, + created_by: user.id, + updated_by: user.id, + deleted_by: None, + status: UserOrganizationStatus::Active, + }; + + insert_into(users_to_organizations::table) + .values(&user_to_org) + .execute(&mut conn) + .await + .context("Failed to add user to new organization")?; + + tracing::info!( + "Created organization {} ({}) and added user {} as admin.", + name, + new_org_id, + user.id + ); + + Ok(()) +} \ No newline at end of file diff --git a/api/server/src/routes/rest/routes/organizations/mod.rs b/api/server/src/routes/rest/routes/organizations/mod.rs index 2997067d2..677280df2 100644 --- a/api/server/src/routes/rest/routes/organizations/mod.rs +++ b/api/server/src/routes/rest/routes/organizations/mod.rs @@ -1,10 +1,15 @@ -use axum::{routing::{get, put}, Router}; +use axum::{ + routing::{get, post, put}, + Router, +}; -mod users; +pub mod post_organization; mod update_organization; +mod users; pub fn router() -> Router { Router::new() .route("/:id/users", get(users::list_organization_users)) .route("/:id", put(update_organization::update_organization)) -} \ No newline at end of file + .route("/", post(post_organization::post_organization)) +} diff --git a/api/server/src/routes/rest/routes/organizations/post_organization.rs b/api/server/src/routes/rest/routes/organizations/post_organization.rs new file mode 100644 index 000000000..04ff4c068 --- /dev/null +++ b/api/server/src/routes/rest/routes/organizations/post_organization.rs @@ -0,0 +1,29 @@ +use axum::{http::StatusCode, Json, Extension}; +use handlers::organizations::post_organization_handler; +use middleware::AuthenticatedUser; +use serde::{Deserialize, Serialize}; + +use crate::routes::rest::ApiResponse; + +#[derive(Deserialize, Serialize)] +pub struct PostOrganizationRequest { + name: String, +} + +/// REST endpoint to create a new organization. +pub async fn post_organization( + Extension(user): Extension, + Json(payload): Json, +) -> Result, (StatusCode, &'static str)> { + post_organization_handler(payload.name, user) + .await + .map_err(|e| { + tracing::error!("Failed to create organization: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to create organization", + ) + })?; + + Ok(ApiResponse::NoContent) +}