diff --git a/api/libs/handlers/Cargo.toml b/api/libs/handlers/Cargo.toml index 69151cd59..9930b79c1 100644 --- a/api/libs/handlers/Cargo.toml +++ b/api/libs/handlers/Cargo.toml @@ -20,6 +20,7 @@ redis = { workspace = true } regex = { workspace = true } indexmap = { workspace = true } async-trait = { workspace = true } +once_cell = { workspace = true } # Local dependencies database = { path = "../database" } diff --git a/api/libs/handlers/src/chats/get_chat_handler.rs b/api/libs/handlers/src/chats/get_chat_handler.rs index a4e9bf505..737e12ad7 100644 --- a/api/libs/handlers/src/chats/get_chat_handler.rs +++ b/api/libs/handlers/src/chats/get_chat_handler.rs @@ -8,7 +8,7 @@ use tokio; use uuid::Uuid; use crate::chats::types::ChatWithMessages; -use crate::messages::types::ChatMessage; +use crate::messages::types::{ChatMessage, ChatUserMessage}; use database::pool::get_pg_pool; use database::schema::{chats, messages, users}; @@ -129,28 +129,34 @@ pub async fn get_chat_handler(chat_id: &Uuid, user_id: &Uuid) -> Result - let response_messages = msg.response_messages + let response_messages = msg + .response_messages .as_array() .map(|arr| arr.to_vec()) .unwrap_or_default(); - let reasoning = msg.reasoning + let reasoning = msg + .reasoning .as_array() .map(|arr| arr.to_vec()) .unwrap_or_default(); - ChatMessage { - id: msg.id, - request_message: crate::messages::types::ChatUserMessage { - request: msg.request_message, - sender_id: msg.user_id, - sender_name: msg.user_name.unwrap_or_else(|| "Unknown".to_string()), - sender_avatar, - }, + let request_message = ChatUserMessage { + request: msg.request_message, + sender_id: msg.user_id, + sender_name: msg.user_name.unwrap_or_else(|| "Unknown".to_string()), + sender_avatar, + }; + + let chat_message = ChatMessage::new_with_messages( + msg.id, + request_message, response_messages, reasoning, - created_at: msg.created_at.to_string(), - } + None, + ); + + chat_message }) .collect(); diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 1a69ecb00..3af88991b 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -1,4 +1,5 @@ -use std::{collections::HashMap, time::Instant}; +use once_cell::sync::OnceCell; +use std::{collections::HashMap, sync::Mutex, time::Instant}; use agents::{ tools::file_tools::search_data_catalog::SearchDataCatalogOutput, AgentExt, AgentMessage, @@ -13,14 +14,14 @@ use database::{ pool::get_pg_pool, schema::{chats, dashboard_files, messages, messages_to_files, metric_files}, }; -use diesel::insert_into; +use diesel::{insert_into, ExpressionMethods}; use diesel_async::RunQueryDsl; use litellm::{ AgentMessage as LiteLLMAgentMessage, ChatCompletionRequest, LiteLLMClient, MessageProgress, Metadata, ToolCall, }; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::{json, Value}; use uuid::Uuid; use crate::chats::{ @@ -36,6 +37,12 @@ use crate::messages::types::{ChatMessage, ChatUserMessage}; use super::types::ChatWithMessages; use tokio::sync::mpsc; +static CHUNK_TRACKER: OnceCell = OnceCell::new(); + +fn get_chunk_tracker() -> &'static ChunkTracker { + CHUNK_TRACKER.get_or_init(|| ChunkTracker::new()) +} + // Define ThreadEvent #[derive(Clone, Copy, Debug)] pub enum ThreadEvent { @@ -55,6 +62,50 @@ pub struct ChatCreateNewChat { pub dashboard_id: Option, } +struct ChunkTracker { + pub chunks: Mutex>, +} + +impl ChunkTracker { + pub fn new() -> Self { + Self { + chunks: Mutex::new(HashMap::new()), + } + } + + pub fn add_chunk(&self, chunk_id: String, chunk_text: String) { + if let Ok(mut chunks) = self.chunks.lock() { + if let Some(existing_chunk) = chunks.get_mut(&chunk_id) { + existing_chunk.push_str(&chunk_text); + } else { + chunks.insert(chunk_id, chunk_text); + } + } + } + + pub fn get_chunk(&self, chunk_id: String) -> Option { + self.chunks + .lock() + .ok() + .and_then(|chunks| chunks.get(&chunk_id).cloned()) + } + + pub fn clear_chunk(&self, chunk_id: String) { + if let Ok(mut chunks) = self.chunks.lock() { + chunks.remove(&chunk_id); + } + } + + pub fn exclude_chunk(&self, chunk_id: String, text: String) -> String { + if let Ok(chunks) = self.chunks.lock() { + if let Some(chunk) = chunks.get(&chunk_id) { + return text.replace(chunk, ""); + } + } + text + } +} + pub async fn post_chat_handler( request: ChatCreateNewChat, user: User, @@ -219,17 +270,26 @@ pub async fn post_chat_handler( } } - println!("Finishing up the agent and moving on to store the final message"); - let title = title_handle.await??; let reasoning_duration = reasoning_duration.elapsed().as_secs(); - let final_reasoning_message = format!("Reasoned for {} seconds", reasoning_duration); - // Transform all messages for final storage let (response_messages, reasoning_messages) = prepare_final_message_state(&all_transformed_containers)?; + // Update chat_with_messages with final state + if let Some(chat_message) = chat_with_messages.messages.first_mut() { + *chat_message = ChatMessage::new_with_messages( + message_id, + chat_message.request_message.clone(), + response_messages.clone(), + reasoning_messages.clone(), + None, + ); + } + + let final_reasoning_message = format!("Reasoned for {} seconds", reasoning_duration); + // Create and store message in the database with final state let message = Message { id: message_id, @@ -255,12 +315,6 @@ pub async fn post_chat_handler( // Process any completed files process_completed_files(&mut conn, &message, &all_messages, &user_org_id, &user.id).await?; - // Update chat_with_messages with final state - if let Some(chat_message) = chat_with_messages.messages.first_mut() { - chat_message.response_messages = response_messages; - chat_message.reasoning = reasoning_messages; - } - if let Some(title) = title.title { chat_with_messages.title = title; } @@ -286,10 +340,8 @@ fn prepare_final_message_state(containers: &[BusterContainer]) -> Result<(Vec { - if chat.response_message.is_final_message.unwrap_or(false) { - if let Ok(value) = serde_json::to_value(&chat.response_message) { - response_messages.push(value); - } + if let Ok(value) = serde_json::to_value(&chat.response_message) { + response_messages.push(value); } } BusterContainer::ReasoningMessage(reasoning) => { @@ -336,7 +388,6 @@ async fn process_completed_files( organization_id: &Uuid, user_id: &Uuid, ) -> Result<()> { - // Transform messages to BusterContainer format let mut transformed_messages = Vec::new(); for msg in messages { if let Ok((containers, _)) = @@ -346,90 +397,138 @@ async fn process_completed_files( } } - // Process any completed metric or dashboard files for container in transformed_messages { match container { BusterContainer::ReasoningMessage(msg) => match &msg.reasoning { BusterReasoningMessage::File(file) if file.message_type == "files" => { - // Process each file in the files array - for file_content in &file.files { - match file_content.file_type.as_str() { - "metric" => { - let metric_file = MetricFile { - id: Uuid::new_v4(), - name: file_content.file_name.clone(), - file_name: format!( - "{}", - file_content.file_name.to_lowercase().replace(' ', "_") - ), - content: serde_json::to_value(&file_content.content)?, - verification: Verification::NotRequested, - evaluation_obj: None, - evaluation_summary: None, - evaluation_score: None, - organization_id: organization_id.clone(), - created_by: user_id.clone(), - created_at: Utc::now(), - updated_at: Utc::now(), - deleted_at: None, - }; + for file_id in &file.file_ids { + if let Some(file_content) = file.files.get(file_id) { + // Create both reasoning and response messages for the completed file + let mut reasoning_messages = message.reasoning.clone(); + let mut response_messages = message.response_messages.clone(); - insert_into(metric_files::table) - .values(&metric_file) - .execute(conn) - .await?; - - let message_to_file = MessageToFile { - id: Uuid::new_v4(), - message_id: message.id, - file_id: metric_file.id, - created_at: Utc::now(), - updated_at: Utc::now(), - deleted_at: None, - }; - - insert_into(messages_to_files::table) - .values(&message_to_file) - .execute(conn) - .await?; + // Update the reasoning message status to completed + if let Value::Array(ref mut arr) = reasoning_messages { + if let Some(reasoning_msg) = arr.iter_mut().find(|msg| { + msg.get("id").and_then(Value::as_str) == Some(&file.id) + }) { + if let Some(obj) = reasoning_msg.as_object_mut() { + obj.insert("status".to_string(), json!("completed")); + } + } } - "dashboard" => { - let dashboard_file = DashboardFile { - id: Uuid::new_v4(), - name: file_content.file_name.clone(), - file_name: format!( - "{}", - file_content.file_name.to_lowercase().replace(' ', "_") - ), - content: serde_json::to_value(&file_content.content)?, - filter: None, - organization_id: organization_id.clone(), - created_by: user_id.clone(), - created_at: Utc::now(), - updated_at: Utc::now(), - deleted_at: None, - }; - insert_into(dashboard_files::table) - .values(&dashboard_file) - .execute(conn) - .await?; + // Create and add the response message for the file + let response_message = BusterChatMessage::File { + id: file_content.id.clone(), + file_type: file_content.file_type.clone(), + file_name: file_content.file_name.clone(), + version_number: file_content.version_number, + version_id: file_content.version_id.clone(), + filter_version_id: None, + metadata: file_content.metadata.as_ref().map(|m| { + m.iter() + .map(|meta| BusterChatResponseFileMetadata { + status: meta.status.clone(), + message: meta.message.clone(), + timestamp: meta.timestamp, + }) + .collect() + }), + }; - let message_to_file = MessageToFile { - id: Uuid::new_v4(), - message_id: message.id, - file_id: dashboard_file.id, - created_at: Utc::now(), - updated_at: Utc::now(), - deleted_at: None, - }; - - insert_into(messages_to_files::table) - .values(&message_to_file) - .execute(conn) - .await?; + if let Value::Array(ref mut arr) = response_messages { + arr.push(serde_json::to_value(&response_message)?); + } + + // Update both messages in the database + diesel::update(messages::table) + .filter(messages::id.eq(message.id)) + .set(( + messages::response_messages.eq(response_messages), + messages::reasoning.eq(reasoning_messages), + )) + .execute(conn) + .await?; + + match file_content.file_type.as_str() { + "metric" => { + let metric_file = MetricFile { + id: Uuid::new_v4(), + name: file_content.file_name.clone(), + file_name: format!( + "{}", + file_content.file_name.to_lowercase().replace(' ', "_") + ), + content: serde_json::to_value(&file_content.file.text)?, + verification: Verification::NotRequested, + evaluation_obj: None, + evaluation_summary: None, + evaluation_score: None, + organization_id: organization_id.clone(), + created_by: user_id.clone(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }; + + insert_into(metric_files::table) + .values(&metric_file) + .execute(conn) + .await?; + + let message_to_file = MessageToFile { + id: Uuid::new_v4(), + message_id: message.id, + file_id: metric_file.id, + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }; + + insert_into(messages_to_files::table) + .values(&message_to_file) + .execute(conn) + .await?; + } + "dashboard" => { + let dashboard_file = DashboardFile { + id: Uuid::new_v4(), + name: file_content.file_name.clone(), + file_name: format!( + "{}", + file_content.file_name.to_lowercase().replace(' ', "_") + ), + content: serde_json::to_value(&file_content.file.text)?, + filter: None, + organization_id: organization_id.clone(), + created_by: user_id.clone(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }; + + insert_into(dashboard_files::table) + .values(&dashboard_file) + .execute(conn) + .await?; + + let message_to_file = MessageToFile { + id: Uuid::new_v4(), + message_id: message.id, + file_id: dashboard_file.id, + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }; + + insert_into(messages_to_files::table) + .values(&message_to_file) + .execute(conn) + .await?; + } + _ => (), } - _ => (), } } } @@ -442,14 +541,20 @@ async fn process_completed_files( Ok(()) } -#[derive(Debug, Serialize)] -#[serde(untagged)] +#[derive(Debug, Serialize, Clone)] pub enum BusterChatContainer { - ChatMessage(BusterChatMessage), + ChatMessage(BusterChatMessageContainer), Thought(BusterReasoningPill), File(BusterReasoningFile), } +#[derive(Debug, Serialize, Clone)] +pub struct BusterChatContainerContainer { + pub container: BusterChatContainer, + pub chat_id: Uuid, + pub message_id: Uuid, +} + #[derive(Debug, Serialize, Clone)] pub struct BusterChatMessageContainer { pub response_message: BusterChatMessage, @@ -473,13 +578,30 @@ pub struct BusterReasoningMessageContainer { } #[derive(Debug, Serialize, Clone)] -pub struct BusterChatMessage { - pub id: String, - #[serde(rename = "type")] - pub message_type: String, - pub message: Option, - pub message_chunk: Option, - pub is_final_message: Option, +pub struct BusterChatResponseFileMetadata { + pub status: String, + pub message: String, + pub timestamp: Option, +} + +#[derive(Debug, Serialize, Clone)] +#[serde(tag = "type")] +pub enum BusterChatMessage { + Text { + id: String, + message: Option, + message_chunk: Option, + is_final_message: Option, + }, + File { + id: String, + file_type: String, + file_name: String, + version_number: i32, + version_id: String, + filter_version_id: Option, + metadata: Option>, + }, } #[derive(Debug, Serialize, Clone)] @@ -520,17 +642,24 @@ pub struct BusterThoughtPill { } #[derive(Debug, Serialize, Clone)] -pub struct BusterFileContent { +pub struct BusterFile { pub id: String, pub file_type: String, pub file_name: String, pub version_number: i32, pub version_id: String, pub status: String, - pub content: Vec, + pub file: BusterFileContent, pub metadata: Option>, } +#[derive(Debug, Serialize, Clone)] +pub struct BusterFileContent { + pub text: Option, + pub text_chunk: Option, + pub modifided: Option>, +} + #[derive(Debug, Serialize, Clone)] pub struct BusterReasoningFile { pub id: String, @@ -539,14 +668,8 @@ pub struct BusterReasoningFile { pub title: String, pub secondary_title: String, pub status: String, - pub files: Vec, -} - -#[derive(Debug, Serialize, Clone)] -pub struct BusterFileLine { - pub line_number: usize, - pub text: String, - pub modified: Option, + pub file_ids: Vec, + pub files: HashMap, } #[derive(Debug, Serialize, Clone)] @@ -718,39 +841,29 @@ fn transform_text_message( chat_id: Uuid, message_id: Uuid, ) -> Result> { - println!( - "MESSAGE_STREAM: transform_text_message called with progress: {:?}", - progress - ); + let tracker = get_chunk_tracker(); match progress { MessageProgress::InProgress => { - let container = BusterChatMessage { + let filtered_content = tracker.exclude_chunk(id.clone(), content.clone()); + tracker.add_chunk(id.clone(), filtered_content.clone()); + + Ok(vec![BusterChatMessage::Text { id: id.clone(), - message_type: "text".to_string(), message: None, - message_chunk: Some(content), + message_chunk: Some(filtered_content), is_final_message: Some(false), - }; - println!( - "MESSAGE_STREAM: Created in-progress text message: {:?}", - container - ); - Ok(vec![container]) + }]) } MessageProgress::Complete => { - let container = BusterChatMessage { + tracker.clear_chunk(id.clone()); + + Ok(vec![BusterChatMessage::Text { id: id.clone(), - message_type: "text".to_string(), message: Some(content), message_chunk: None, is_final_message: Some(true), - }; - println!( - "MESSAGE_STREAM: Created complete text message: {:?}", - container - ); - Ok(vec![container]) + }]) } } } @@ -775,12 +888,30 @@ fn transform_tool_message( }; // Convert BusterReasoningMessage to BusterReasoningMessageContainer + let tracker = get_chunk_tracker(); let reasoning_containers = messages .into_iter() - .map(|reasoning| BusterReasoningMessageContainer { - reasoning, - chat_id, - message_id, + .map(|reasoning| { + let updated_reasoning = if let BusterReasoningMessage::Text(mut text) = reasoning { + if let Some(chunk) = text.message_chunk.clone() { + let filtered_content = tracker.exclude_chunk(text.id.clone(), chunk.clone()); + println!("MESSAGE_STREAM: Filtered content: {:?}", filtered_content); + tracker.add_chunk(text.id.clone(), filtered_content.clone()); + text.message_chunk = Some(filtered_content); + } + if text.status == Some("completed".to_string()) { + tracker.clear_chunk(text.id.clone()); + } + BusterReasoningMessage::Text(text) + } else { + reasoning + }; + + BusterReasoningMessageContainer { + reasoning: updated_reasoning, + chat_id, + message_id, + } }) .collect(); @@ -985,6 +1116,7 @@ fn transform_assistant_tool_message( } let mut all_messages = Vec::new(); + let tracker = get_chunk_tracker(); // Process each tool call individually for tool_call in &tool_calls { @@ -1041,10 +1173,27 @@ fn transform_assistant_tool_message( // Convert BusterReasoningMessage to BusterReasoningMessageContainer let containers: Vec = messages .into_iter() - .map(|reasoning| BusterReasoningMessageContainer { - reasoning, - chat_id, - message_id, + .map(|reasoning| { + let updated_reasoning = if let BusterReasoningMessage::Text(mut text) = reasoning { + if let Some(chunk) = text.message_chunk.clone() { + let filtered_content = + tracker.exclude_chunk(text.id.clone(), chunk.clone()); + tracker.add_chunk(text.id.clone(), chunk); + text.message_chunk = Some(filtered_content); + } + if text.status == Some("completed".to_string()) { + tracker.clear_chunk(text.id.clone()); + } + BusterReasoningMessage::Text(text) + } else { + reasoning + }; + + BusterReasoningMessageContainer { + reasoning: updated_reasoning, + chat_id, + message_id, + } }) .collect(); @@ -1409,18 +1558,18 @@ async fn initialize_chat( let mut existing_chat = get_chat_handler(&existing_chat_id, &user.id).await?; // Add new message to existing chat - existing_chat.messages.push(ChatMessage { - id: message_id, - request_message: ChatUserMessage { + existing_chat.messages.push(ChatMessage::new_with_messages( + message_id, + ChatUserMessage { request: request.prompt.clone(), sender_id: user.id.clone(), sender_name: user.name.clone().unwrap_or_default(), sender_avatar: None, }, - response_messages: vec![], - reasoning: vec![], - created_at: Utc::now().to_string(), - }); + Vec::new(), + Vec::new(), + None, + )); Ok((existing_chat_id, message_id, existing_chat)) } else { @@ -1441,18 +1590,18 @@ async fn initialize_chat( id: chat_id, title: request.prompt.clone(), is_favorited: false, - messages: vec![ChatMessage { - id: message_id, - request_message: ChatUserMessage { + messages: vec![ChatMessage::new_with_messages( + message_id, + ChatUserMessage { request: request.prompt.clone(), sender_id: user.id.clone(), sender_name: user.name.clone().unwrap_or_default(), sender_avatar: None, }, - response_messages: vec![], - reasoning: vec![], - created_at: Utc::now().to_string(), - }], + Vec::new(), + Vec::new(), + None, + )], created_at: Utc::now().to_string(), updated_at: Utc::now().to_string(), created_by: user.id.to_string(), diff --git a/api/libs/handlers/src/chats/streaming_parser.rs b/api/libs/handlers/src/chats/streaming_parser.rs index 60c0397e8..133b4b1ff 100644 --- a/api/libs/handlers/src/chats/streaming_parser.rs +++ b/api/libs/handlers/src/chats/streaming_parser.rs @@ -3,8 +3,8 @@ use serde_json::Value; use uuid::Uuid; use super::post_chat_handler::{ - BusterFileLine, BusterReasoningFile, BusterReasoningMessage, BusterReasoningPill, - BusterReasoningText, BusterThoughtPill, BusterThoughtPillContainer, BusterFileContent, + BusterReasoningFile, BusterReasoningMessage, BusterReasoningPill, + BusterReasoningText, BusterThoughtPill, BusterThoughtPillContainer, BusterFile, BusterFileContent, }; pub struct StreamingParser { @@ -245,7 +245,8 @@ impl StreamingParser { file_type: String, ) -> Result> { if let Some(files) = value.get("files").and_then(Value::as_array) { - let mut file_contents = Vec::new(); + let mut files_map = std::collections::HashMap::new(); + let mut file_ids = Vec::new(); for file in files { if let Some(file_obj) = file.as_object() { @@ -259,37 +260,38 @@ impl StreamingParser { .and_then(Value::as_str) .unwrap_or(""); - let mut current_lines = Vec::new(); - for (i, line) in yml_content.lines().enumerate() { - current_lines.push(BusterFileLine { - line_number: i + 1, - text: line.to_string(), - modified: Some(false), - }); - } - - file_contents.push(BusterFileContent { - id: Uuid::new_v4().to_string(), + let file_id = Uuid::new_v4().to_string(); + + let buster_file = BusterFile { + id: file_id.clone(), file_type: file_type.clone(), file_name: name.to_string(), version_number: 1, version_id: Uuid::new_v4().to_string(), status: "loading".to_string(), - content: current_lines, + file: BusterFileContent { + text: Some(yml_content.to_string()), + text_chunk: None, + modifided: None, + }, metadata: None, - }); + }; + + file_ids.push(file_id.clone()); + files_map.insert(file_id, buster_file); } } } - if !file_contents.is_empty() { + if !files_map.is_empty() { return Ok(Some(BusterReasoningMessage::File(BusterReasoningFile { id, message_type: "files".to_string(), title: format!("Creating {} files...", file_type), secondary_title: String::new(), status: "loading".to_string(), - files: file_contents, + file_ids, + files: files_map, }))); } } diff --git a/api/libs/handlers/src/chats/types.rs b/api/libs/handlers/src/chats/types.rs index 6d689342b..dfc97d570 100644 --- a/api/libs/handlers/src/chats/types.rs +++ b/api/libs/handlers/src/chats/types.rs @@ -16,3 +16,4 @@ pub struct ChatWithMessages { pub created_by_name: String, pub created_by_avatar: Option, } + diff --git a/api/libs/handlers/src/messages/types.rs b/api/libs/handlers/src/messages/types.rs index c11216055..9777c556e 100644 --- a/api/libs/handlers/src/messages/types.rs +++ b/api/libs/handlers/src/messages/types.rs @@ -1,3 +1,6 @@ +use std::collections::HashMap; + +use chrono::Utc; use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; @@ -6,9 +9,14 @@ use uuid::Uuid; pub struct ChatMessage { pub id: Uuid, pub request_message: ChatUserMessage, - pub response_messages: Vec, - pub reasoning: Vec, + pub response_message_ids: Vec, + #[serde(default)] + pub response_messages: HashMap, + pub reasoning_message_ids: Vec, + #[serde(default)] + pub reasoning_messages: HashMap, pub created_at: String, + pub final_reasoning_message: Option, } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -18,3 +26,73 @@ pub struct ChatUserMessage { pub sender_name: String, pub sender_avatar: Option, } + +impl ChatMessage { + pub fn new( + request: String, + sender_id: Uuid, + sender_name: String, + sender_avatar: Option, + ) -> Self { + Self { + id: Uuid::new_v4(), + request_message: ChatUserMessage { + request, + sender_id, + sender_name, + sender_avatar, + }, + response_message_ids: Vec::new(), + response_messages: HashMap::new(), + reasoning_message_ids: Vec::new(), + reasoning_messages: HashMap::new(), + created_at: Utc::now().to_rfc3339(), + final_reasoning_message: None, + } + } + + pub fn new_with_messages( + id: Uuid, + request_message: ChatUserMessage, + response_messages: Vec, + reasoning_messages: Vec, + final_reasoning_message: Option, + ) -> Self { + let response_message_ids: Vec = response_messages + .iter() + .filter_map(|msg| msg.get("id").and_then(|id| id.as_str()).map(String::from)) + .collect(); + + let reasoning_message_ids: Vec = reasoning_messages + .iter() + .filter_map(|msg| msg.get("id").and_then(|id| id.as_str()).map(String::from)) + .collect(); + + let response_messages_map: HashMap = response_messages + .into_iter() + .filter_map(|msg| { + let id = msg.get("id").and_then(|id| id.as_str())?; + Some((id.to_string(), msg)) + }) + .collect(); + + let reasoning_messages_map: HashMap = reasoning_messages + .into_iter() + .filter_map(|msg| { + let id = msg.get("id").and_then(|id| id.as_str())?; + Some((id.to_string(), msg)) + }) + .collect(); + + Self { + id, + request_message, + response_message_ids, + response_messages: response_messages_map, + reasoning_message_ids, + reasoning_messages: reasoning_messages_map, + created_at: Utc::now().to_rfc3339(), + final_reasoning_message, + } + } +}