a little faster

This commit is contained in:
dal 2025-04-18 14:56:14 -06:00
parent ad4d2c8568
commit 177c8fd1c8
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 140 additions and 104 deletions

View File

@ -59,7 +59,7 @@ pub struct Message {
pub feedback: Option<String>, pub feedback: Option<String>,
} }
#[derive(Queryable, Insertable, Debug)] #[derive(Queryable, Insertable, Debug, Clone)]
#[diesel(table_name = messages_to_files)] #[diesel(table_name = messages_to_files)]
pub struct MessageToFile { pub struct MessageToFile {
pub id: Uuid, pub id: Uuid,

View File

@ -8,6 +8,7 @@ use database::{
}; };
use diesel::{insert_into, update, ExpressionMethods, QueryDsl}; use diesel::{insert_into, update, ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use middleware::AuthenticatedUser; use middleware::AuthenticatedUser;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
@ -20,7 +21,7 @@ use crate::dashboards::{update_dashboard_handler, DashboardUpdateRequest};
use crate::metrics::{update_metric_handler, UpdateMetricRequest}; use crate::metrics::{update_metric_handler, UpdateMetricRequest};
/// Request structure for restoring an asset (metric or dashboard) version in a chat /// Request structure for restoring an asset (metric or dashboard) version in a chat
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ChatRestoreRequest { pub struct ChatRestoreRequest {
/// ID of the asset to restore /// ID of the asset to restore
pub asset_id: Uuid, pub asset_id: Uuid,
@ -41,51 +42,56 @@ pub struct ChatRestoreRequest {
/// * `Result<ChatWithMessages>` - The updated chat with new messages documenting the restoration /// * `Result<ChatWithMessages>` - The updated chat with new messages documenting the restoration
/// ///
/// # Process /// # Process
/// 1. Restores the specified asset version using the appropriate handler /// 1. Concurrently:
/// 2. Creates a text message in the chat documenting the restoration /// a. Restores the specified asset version using the appropriate handler
/// 3. Creates a file message linking to the restored asset /// b. Fetches the most recent message from the chat (to copy raw_llm_messages)
/// 4. Updates the chat record with the latest file info /// 2. Waits for restoration and fetch to complete.
/// 5. Returns the updated chat with all messages /// 3. Constructs new message details (text, file link, raw_llm_messages).
/// 4. Concurrently:
/// a. Inserts the new message documenting the restoration
/// b. Inserts the message-to-file association
/// c. Updates the chat record with the latest file info
/// 5. Waits for insertions and update to complete.
/// 6. Returns the updated chat with all messages
pub async fn restore_chat_handler( pub async fn restore_chat_handler(
chat_id: &Uuid, chat_id: &Uuid,
user: &AuthenticatedUser, user: &AuthenticatedUser,
request: ChatRestoreRequest, request: ChatRestoreRequest,
) -> Result<ChatWithMessages> { ) -> Result<ChatWithMessages> {
let mut conn = get_pg_pool().get().await?; // Clone variables needed for concurrent tasks
let user_clone1 = user.clone();
let request_clone1 = request.clone();
let chat_id_clone1 = *chat_id;
// Step 1: Restore the asset using the appropriate handler // Task 1: Restore Asset
let (file_type, file_name, file_id, version_number) = match request.asset_type { let restore_task = tokio::spawn(async move {
let (file_type, file_name, file_id, version_number) = match request_clone1.asset_type {
AssetType::MetricFile => { AssetType::MetricFile => {
// Create a metric update request with only the restore_to_version parameter
let metric_request = UpdateMetricRequest { let metric_request = UpdateMetricRequest {
restore_to_version: Some(request.version_number), restore_to_version: Some(request_clone1.version_number),
..Default::default() ..Default::default()
}; };
// Call the metric update handler through the public module function
let updated_metric = let updated_metric =
update_metric_handler(&request.asset_id, user, metric_request).await?; update_metric_handler(&request_clone1.asset_id, &user_clone1, metric_request)
.await?;
// Return the file information
( (
"metric".to_string(), "metric".to_string(),
updated_metric.name, updated_metric.name,
updated_metric.id, updated_metric.id,
updated_metric.versions.len() as i32, // Get version number from versions length updated_metric.versions.len() as i32,
) )
} }
AssetType::DashboardFile => { AssetType::DashboardFile => {
// Create a dashboard update request with only the restore_to_version parameter
let dashboard_request = DashboardUpdateRequest { let dashboard_request = DashboardUpdateRequest {
restore_to_version: Some(request.version_number), restore_to_version: Some(request_clone1.version_number),
..Default::default() ..Default::default()
}; };
let updated_dashboard = update_dashboard_handler(
// Call the dashboard update handler through the public module function request_clone1.asset_id,
let updated_dashboard = dashboard_request,
update_dashboard_handler(request.asset_id, dashboard_request, user).await?; &user_clone1,
)
// Return the file information .await?;
( (
"dashboard".to_string(), "dashboard".to_string(),
updated_dashboard.dashboard.name, updated_dashboard.dashboard.name,
@ -96,29 +102,42 @@ pub async fn restore_chat_handler(
_ => { _ => {
return Err(anyhow!( return Err(anyhow!(
"Unsupported asset type for restoration: {:?}", "Unsupported asset type for restoration: {:?}",
request.asset_type request_clone1.asset_type
)) ))
} }
}; };
// Explicitly type the Ok variant for the compiler
Ok::<_, anyhow::Error>((file_type, file_name, file_id, version_number))
});
// Step 2: Get the most recent message to copy raw_llm_messages // Task 2: Get the most recent message to copy raw_llm_messages
// Fetch the most recent message for the chat to extract raw_llm_messages let last_message_task = tokio::spawn(async move {
let mut conn = get_pg_pool().get().await?;
let last_message = messages::table let last_message = messages::table
.filter(messages::chat_id.eq(chat_id)) .filter(messages::chat_id.eq(&chat_id_clone1))
.filter(messages::deleted_at.is_null()) .filter(messages::deleted_at.is_null())
.limit(1) .limit(1)
// We need to use order here to get the latest message .order_by(messages::created_at.desc())
.then_order_by(messages::created_at.desc()) .first::<Message>(&mut conn) // Assuming Message derives Clone
.first::<Message>(&mut conn)
.await .await
.ok(); .ok();
// Explicitly type the Ok variant
Ok::<_, anyhow::Error>(last_message)
});
// Create raw_llm_messages by copying from the previous message and adding restoration entries // Wait for initial tasks to complete
let (restore_result, last_message_result) = tokio::join!(restore_task, last_message_task);
// Handle potential errors from spawned tasks
let (file_type, file_name, file_id, version_number) = restore_result??;
let last_message = last_message_result??;
// Step 3: Construct message details
let tool_call_id = format!("call_{}", Uuid::new_v4().to_string().replace("-", "")); let tool_call_id = format!("call_{}", Uuid::new_v4().to_string().replace("-", ""));
// Start with copied raw_llm_messages or an empty array
let mut raw_llm_messages = if let Some(last_msg) = &last_message { let mut raw_llm_messages = if let Some(last_msg) = &last_message {
if let Ok(msgs) = serde_json::from_value::<Vec<Value>>(last_msg.raw_llm_messages.clone()) { // Use clone here if last_message is Some(Message)
if let Ok(msgs) = serde_json::from_value::<Vec<Value>>(last_msg.raw_llm_messages.clone())
{
msgs msgs
} else { } else {
Vec::new() Vec::new()
@ -127,7 +146,6 @@ pub async fn restore_chat_handler(
Vec::new() Vec::new()
}; };
// Add tool call message and tool response message
raw_llm_messages.push(json!({ raw_llm_messages.push(json!({
"name": "buster_super_agent", "name": "buster_super_agent",
"role": "assistant", "role": "assistant",
@ -144,8 +162,6 @@ pub async fn restore_chat_handler(
} }
] ]
})); }));
// Add the tool response
raw_llm_messages.push(json!({ raw_llm_messages.push(json!({
"name": format!("restore_{}", file_type), "name": format!("restore_{}", file_type),
"role": "tool", "role": "tool",
@ -156,13 +172,10 @@ pub async fn restore_chat_handler(
"tool_call_id": tool_call_id "tool_call_id": tool_call_id
})); }));
// Step 3: Create a message with text and file responses
let message_id = Uuid::new_v4(); let message_id = Uuid::new_v4();
let now = Utc::now(); let now = Utc::now();
let timestamp = now.timestamp(); let timestamp = now.timestamp();
// Create response messages array with both text and file response
let response_messages = json!([ let response_messages = json!([
{ {
"id": file_id.to_string(), "id": file_id.to_string(),
@ -174,21 +187,22 @@ pub async fn restore_chat_handler(
"timestamp": timestamp "timestamp": timestamp
} }
], ],
"file_name": file_name, "file_name": file_name, // file_name is already String, no clone needed if moved
"file_type": file_type, "file_type": file_type, // file_type is already String, no clone needed if moved
"version_number": version_number, "version_number": version_number, // version_number is i32 (Copy)
"filter_version_id": null "filter_version_id": null
} }
]); ]);
// Create a Message object to insert // Create Message object - requires Message to be Clone if used in multiple tasks
// Assuming Message derives Clone
let message = Message { let message = Message {
id: message_id, id: message_id,
request_message: None, // Empty request message as per requirement request_message: None,
response_messages: response_messages, response_messages, // This is Value, likely Clone
reasoning: json!([]), reasoning: json!([]),
title: "Version Restoration".to_string(), title: "Version Restoration".to_string(),
raw_llm_messages: Value::Array(raw_llm_messages.clone()), raw_llm_messages: Value::Array(raw_llm_messages), // raw_llm_messages moved here
final_reasoning_message: Some(format!( final_reasoning_message: Some(format!(
"v{} was created by restoring v{}", "v{} was created by restoring v{}",
version_number, request.version_number version_number, request.version_number
@ -201,13 +215,8 @@ pub async fn restore_chat_handler(
feedback: None, feedback: None,
}; };
// Insert the message // Create MessageToFile object - requires MessageToFile to be Clone if used in multiple tasks
diesel::insert_into(messages::table) // Assuming MessageToFile derives Clone
.values(&message)
.execute(&mut conn)
.await?;
// Create the message-to-file association
let message_to_file = MessageToFile { let message_to_file = MessageToFile {
id: Uuid::new_v4(), id: Uuid::new_v4(),
message_id: message_id, message_id: message_id,
@ -219,23 +228,50 @@ pub async fn restore_chat_handler(
version_number: version_number, version_number: version_number,
}; };
// Insert the message-to-file association into the database // Step 4: Concurrently insert message, message_to_file, and update chat
diesel::insert_into(messages_to_files::table) // Clone necessary variables for final tasks
.values(&message_to_file) let message_clone = message.clone(); // Requires Message: Clone
let message_to_file_clone = message_to_file.clone(); // Requires MessageToFile: Clone
let chat_id_clone2 = *chat_id;
let request_asset_type_clone = request.asset_type; // AssetType is likely Copy
let file_id_clone = file_id; // file_id is Uuid (Copy)
let insert_message_task = tokio::spawn(async move {
let mut conn = get_pg_pool().get().await?;
diesel::insert_into(messages::table)
.values(&message_clone) // Use cloned message
.execute(&mut conn) .execute(&mut conn)
.await?; .await?;
Ok::<_, anyhow::Error>(()) // Explicit Ok type
});
// Step 4: Update the chat record with the latest file info let insert_mtf_task = tokio::spawn(async move {
let mut conn = get_pg_pool().get().await?;
diesel::insert_into(messages_to_files::table)
.values(&message_to_file_clone) // Use cloned mtf
.execute(&mut conn)
.await?;
Ok::<_, anyhow::Error>(()) // Explicit Ok type
});
let update_chat_task = tokio::spawn(async move {
let mut conn = get_pg_pool().get().await?;
update(chats::table) update(chats::table)
.filter(chats::id.eq(chat_id)) .filter(chats::id.eq(&chat_id_clone2))
.set(( .set((
chats::most_recent_file_id.eq(Some(file_id)), chats::most_recent_file_id.eq(Some(file_id_clone)),
chats::most_recent_version_number.eq(Some(version_number)), chats::most_recent_version_number.eq(Some(version_number)), // version_number is Copy
chats::most_recent_file_type.eq(Some(request.asset_type.to_string())), chats::most_recent_file_type.eq(Some(request_asset_type_clone.to_string())),
chats::updated_at.eq(now), chats::updated_at.eq(now), // now is Copy
)) ))
.execute(&mut conn) .execute(&mut conn)
.await?; .await?;
Ok::<_, anyhow::Error>(()) // Explicit Ok type
});
// Wait for final database operations using try_join_all for cleaner error handling
try_join_all(vec![insert_message_task, insert_mtf_task, update_chat_task]).await?;
// Return the updated chat with messages // Return the updated chat with messages
get_chat_handler(chat_id, user, false).await get_chat_handler(chat_id, user, false).await