last few changes

This commit is contained in:
dal 2025-03-05 08:03:48 -07:00
parent f2520c1b56
commit 37ae21fa31
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
6 changed files with 426 additions and 189 deletions

View File

@ -20,6 +20,7 @@ redis = { workspace = true }
regex = { workspace = true }
indexmap = { workspace = true }
async-trait = { workspace = true }
once_cell = { workspace = true }
# Local dependencies
database = { path = "../database" }

View File

@ -8,7 +8,7 @@ use tokio;
use uuid::Uuid;
use crate::chats::types::ChatWithMessages;
use crate::messages::types::ChatMessage;
use crate::messages::types::{ChatMessage, ChatUserMessage};
use database::pool::get_pg_pool;
use database::schema::{chats, messages, users};
@ -129,28 +129,34 @@ pub async fn get_chat_handler(chat_id: &Uuid, user_id: &Uuid) -> Result<ChatWith
.map(String::from);
// Convert response_messages and reasoning to Vec<Value>
let response_messages = msg.response_messages
let response_messages = msg
.response_messages
.as_array()
.map(|arr| arr.to_vec())
.unwrap_or_default();
let reasoning = msg.reasoning
let reasoning = msg
.reasoning
.as_array()
.map(|arr| arr.to_vec())
.unwrap_or_default();
ChatMessage {
id: msg.id,
request_message: crate::messages::types::ChatUserMessage {
request: msg.request_message,
sender_id: msg.user_id,
sender_name: msg.user_name.unwrap_or_else(|| "Unknown".to_string()),
sender_avatar,
},
let request_message = ChatUserMessage {
request: msg.request_message,
sender_id: msg.user_id,
sender_name: msg.user_name.unwrap_or_else(|| "Unknown".to_string()),
sender_avatar,
};
let chat_message = ChatMessage::new_with_messages(
msg.id,
request_message,
response_messages,
reasoning,
created_at: msg.created_at.to_string(),
}
None,
);
chat_message
})
.collect();

View File

@ -1,4 +1,5 @@
use std::{collections::HashMap, time::Instant};
use once_cell::sync::OnceCell;
use std::{collections::HashMap, sync::Mutex, time::Instant};
use agents::{
tools::file_tools::search_data_catalog::SearchDataCatalogOutput, AgentExt, AgentMessage,
@ -13,14 +14,14 @@ use database::{
pool::get_pg_pool,
schema::{chats, dashboard_files, messages, messages_to_files, metric_files},
};
use diesel::insert_into;
use diesel::{insert_into, ExpressionMethods};
use diesel_async::RunQueryDsl;
use litellm::{
AgentMessage as LiteLLMAgentMessage, ChatCompletionRequest, LiteLLMClient, MessageProgress,
Metadata, ToolCall,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{json, Value};
use uuid::Uuid;
use crate::chats::{
@ -36,6 +37,12 @@ use crate::messages::types::{ChatMessage, ChatUserMessage};
use super::types::ChatWithMessages;
use tokio::sync::mpsc;
static CHUNK_TRACKER: OnceCell<ChunkTracker> = OnceCell::new();
fn get_chunk_tracker() -> &'static ChunkTracker {
CHUNK_TRACKER.get_or_init(|| ChunkTracker::new())
}
// Define ThreadEvent
#[derive(Clone, Copy, Debug)]
pub enum ThreadEvent {
@ -55,6 +62,50 @@ pub struct ChatCreateNewChat {
pub dashboard_id: Option<Uuid>,
}
struct ChunkTracker {
pub chunks: Mutex<HashMap<String, String>>,
}
impl ChunkTracker {
pub fn new() -> Self {
Self {
chunks: Mutex::new(HashMap::new()),
}
}
pub fn add_chunk(&self, chunk_id: String, chunk_text: String) {
if let Ok(mut chunks) = self.chunks.lock() {
if let Some(existing_chunk) = chunks.get_mut(&chunk_id) {
existing_chunk.push_str(&chunk_text);
} else {
chunks.insert(chunk_id, chunk_text);
}
}
}
pub fn get_chunk(&self, chunk_id: String) -> Option<String> {
self.chunks
.lock()
.ok()
.and_then(|chunks| chunks.get(&chunk_id).cloned())
}
pub fn clear_chunk(&self, chunk_id: String) {
if let Ok(mut chunks) = self.chunks.lock() {
chunks.remove(&chunk_id);
}
}
pub fn exclude_chunk(&self, chunk_id: String, text: String) -> String {
if let Ok(chunks) = self.chunks.lock() {
if let Some(chunk) = chunks.get(&chunk_id) {
return text.replace(chunk, "");
}
}
text
}
}
pub async fn post_chat_handler(
request: ChatCreateNewChat,
user: User,
@ -219,17 +270,26 @@ pub async fn post_chat_handler(
}
}
println!("Finishing up the agent and moving on to store the final message");
let title = title_handle.await??;
let reasoning_duration = reasoning_duration.elapsed().as_secs();
let final_reasoning_message = format!("Reasoned for {} seconds", reasoning_duration);
// Transform all messages for final storage
let (response_messages, reasoning_messages) =
prepare_final_message_state(&all_transformed_containers)?;
// Update chat_with_messages with final state
if let Some(chat_message) = chat_with_messages.messages.first_mut() {
*chat_message = ChatMessage::new_with_messages(
message_id,
chat_message.request_message.clone(),
response_messages.clone(),
reasoning_messages.clone(),
None,
);
}
let final_reasoning_message = format!("Reasoned for {} seconds", reasoning_duration);
// Create and store message in the database with final state
let message = Message {
id: message_id,
@ -255,12 +315,6 @@ pub async fn post_chat_handler(
// Process any completed files
process_completed_files(&mut conn, &message, &all_messages, &user_org_id, &user.id).await?;
// Update chat_with_messages with final state
if let Some(chat_message) = chat_with_messages.messages.first_mut() {
chat_message.response_messages = response_messages;
chat_message.reasoning = reasoning_messages;
}
if let Some(title) = title.title {
chat_with_messages.title = title;
}
@ -286,10 +340,8 @@ fn prepare_final_message_state(containers: &[BusterContainer]) -> Result<(Vec<Va
for container in containers {
match container {
BusterContainer::ChatMessage(chat) => {
if chat.response_message.is_final_message.unwrap_or(false) {
if let Ok(value) = serde_json::to_value(&chat.response_message) {
response_messages.push(value);
}
if let Ok(value) = serde_json::to_value(&chat.response_message) {
response_messages.push(value);
}
}
BusterContainer::ReasoningMessage(reasoning) => {
@ -336,7 +388,6 @@ async fn process_completed_files(
organization_id: &Uuid,
user_id: &Uuid,
) -> Result<()> {
// Transform messages to BusterContainer format
let mut transformed_messages = Vec::new();
for msg in messages {
if let Ok((containers, _)) =
@ -346,90 +397,138 @@ async fn process_completed_files(
}
}
// Process any completed metric or dashboard files
for container in transformed_messages {
match container {
BusterContainer::ReasoningMessage(msg) => match &msg.reasoning {
BusterReasoningMessage::File(file) if file.message_type == "files" => {
// Process each file in the files array
for file_content in &file.files {
match file_content.file_type.as_str() {
"metric" => {
let metric_file = MetricFile {
id: Uuid::new_v4(),
name: file_content.file_name.clone(),
file_name: format!(
"{}",
file_content.file_name.to_lowercase().replace(' ', "_")
),
content: serde_json::to_value(&file_content.content)?,
verification: Verification::NotRequested,
evaluation_obj: None,
evaluation_summary: None,
evaluation_score: None,
organization_id: organization_id.clone(),
created_by: user_id.clone(),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
for file_id in &file.file_ids {
if let Some(file_content) = file.files.get(file_id) {
// Create both reasoning and response messages for the completed file
let mut reasoning_messages = message.reasoning.clone();
let mut response_messages = message.response_messages.clone();
insert_into(metric_files::table)
.values(&metric_file)
.execute(conn)
.await?;
let message_to_file = MessageToFile {
id: Uuid::new_v4(),
message_id: message.id,
file_id: metric_file.id,
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
insert_into(messages_to_files::table)
.values(&message_to_file)
.execute(conn)
.await?;
// Update the reasoning message status to completed
if let Value::Array(ref mut arr) = reasoning_messages {
if let Some(reasoning_msg) = arr.iter_mut().find(|msg| {
msg.get("id").and_then(Value::as_str) == Some(&file.id)
}) {
if let Some(obj) = reasoning_msg.as_object_mut() {
obj.insert("status".to_string(), json!("completed"));
}
}
}
"dashboard" => {
let dashboard_file = DashboardFile {
id: Uuid::new_v4(),
name: file_content.file_name.clone(),
file_name: format!(
"{}",
file_content.file_name.to_lowercase().replace(' ', "_")
),
content: serde_json::to_value(&file_content.content)?,
filter: None,
organization_id: organization_id.clone(),
created_by: user_id.clone(),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
insert_into(dashboard_files::table)
.values(&dashboard_file)
.execute(conn)
.await?;
// Create and add the response message for the file
let response_message = BusterChatMessage::File {
id: file_content.id.clone(),
file_type: file_content.file_type.clone(),
file_name: file_content.file_name.clone(),
version_number: file_content.version_number,
version_id: file_content.version_id.clone(),
filter_version_id: None,
metadata: file_content.metadata.as_ref().map(|m| {
m.iter()
.map(|meta| BusterChatResponseFileMetadata {
status: meta.status.clone(),
message: meta.message.clone(),
timestamp: meta.timestamp,
})
.collect()
}),
};
let message_to_file = MessageToFile {
id: Uuid::new_v4(),
message_id: message.id,
file_id: dashboard_file.id,
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
insert_into(messages_to_files::table)
.values(&message_to_file)
.execute(conn)
.await?;
if let Value::Array(ref mut arr) = response_messages {
arr.push(serde_json::to_value(&response_message)?);
}
// Update both messages in the database
diesel::update(messages::table)
.filter(messages::id.eq(message.id))
.set((
messages::response_messages.eq(response_messages),
messages::reasoning.eq(reasoning_messages),
))
.execute(conn)
.await?;
match file_content.file_type.as_str() {
"metric" => {
let metric_file = MetricFile {
id: Uuid::new_v4(),
name: file_content.file_name.clone(),
file_name: format!(
"{}",
file_content.file_name.to_lowercase().replace(' ', "_")
),
content: serde_json::to_value(&file_content.file.text)?,
verification: Verification::NotRequested,
evaluation_obj: None,
evaluation_summary: None,
evaluation_score: None,
organization_id: organization_id.clone(),
created_by: user_id.clone(),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
insert_into(metric_files::table)
.values(&metric_file)
.execute(conn)
.await?;
let message_to_file = MessageToFile {
id: Uuid::new_v4(),
message_id: message.id,
file_id: metric_file.id,
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
insert_into(messages_to_files::table)
.values(&message_to_file)
.execute(conn)
.await?;
}
"dashboard" => {
let dashboard_file = DashboardFile {
id: Uuid::new_v4(),
name: file_content.file_name.clone(),
file_name: format!(
"{}",
file_content.file_name.to_lowercase().replace(' ', "_")
),
content: serde_json::to_value(&file_content.file.text)?,
filter: None,
organization_id: organization_id.clone(),
created_by: user_id.clone(),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
insert_into(dashboard_files::table)
.values(&dashboard_file)
.execute(conn)
.await?;
let message_to_file = MessageToFile {
id: Uuid::new_v4(),
message_id: message.id,
file_id: dashboard_file.id,
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
insert_into(messages_to_files::table)
.values(&message_to_file)
.execute(conn)
.await?;
}
_ => (),
}
_ => (),
}
}
}
@ -442,14 +541,20 @@ async fn process_completed_files(
Ok(())
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
#[derive(Debug, Serialize, Clone)]
pub enum BusterChatContainer {
ChatMessage(BusterChatMessage),
ChatMessage(BusterChatMessageContainer),
Thought(BusterReasoningPill),
File(BusterReasoningFile),
}
#[derive(Debug, Serialize, Clone)]
pub struct BusterChatContainerContainer {
pub container: BusterChatContainer,
pub chat_id: Uuid,
pub message_id: Uuid,
}
#[derive(Debug, Serialize, Clone)]
pub struct BusterChatMessageContainer {
pub response_message: BusterChatMessage,
@ -473,13 +578,30 @@ pub struct BusterReasoningMessageContainer {
}
#[derive(Debug, Serialize, Clone)]
pub struct BusterChatMessage {
pub id: String,
#[serde(rename = "type")]
pub message_type: String,
pub message: Option<String>,
pub message_chunk: Option<String>,
pub is_final_message: Option<bool>,
pub struct BusterChatResponseFileMetadata {
pub status: String,
pub message: String,
pub timestamp: Option<i64>,
}
#[derive(Debug, Serialize, Clone)]
#[serde(tag = "type")]
pub enum BusterChatMessage {
Text {
id: String,
message: Option<String>,
message_chunk: Option<String>,
is_final_message: Option<bool>,
},
File {
id: String,
file_type: String,
file_name: String,
version_number: i32,
version_id: String,
filter_version_id: Option<String>,
metadata: Option<Vec<BusterChatResponseFileMetadata>>,
},
}
#[derive(Debug, Serialize, Clone)]
@ -520,17 +642,24 @@ pub struct BusterThoughtPill {
}
#[derive(Debug, Serialize, Clone)]
pub struct BusterFileContent {
pub struct BusterFile {
pub id: String,
pub file_type: String,
pub file_name: String,
pub version_number: i32,
pub version_id: String,
pub status: String,
pub content: Vec<BusterFileLine>,
pub file: BusterFileContent,
pub metadata: Option<Vec<BusterFileMetadata>>,
}
#[derive(Debug, Serialize, Clone)]
pub struct BusterFileContent {
pub text: Option<String>,
pub text_chunk: Option<String>,
pub modifided: Option<Vec<(i32, i32)>>,
}
#[derive(Debug, Serialize, Clone)]
pub struct BusterReasoningFile {
pub id: String,
@ -539,14 +668,8 @@ pub struct BusterReasoningFile {
pub title: String,
pub secondary_title: String,
pub status: String,
pub files: Vec<BusterFileContent>,
}
#[derive(Debug, Serialize, Clone)]
pub struct BusterFileLine {
pub line_number: usize,
pub text: String,
pub modified: Option<bool>,
pub file_ids: Vec<String>,
pub files: HashMap<String, BusterFile>,
}
#[derive(Debug, Serialize, Clone)]
@ -718,39 +841,29 @@ fn transform_text_message(
chat_id: Uuid,
message_id: Uuid,
) -> Result<Vec<BusterChatMessage>> {
println!(
"MESSAGE_STREAM: transform_text_message called with progress: {:?}",
progress
);
let tracker = get_chunk_tracker();
match progress {
MessageProgress::InProgress => {
let container = BusterChatMessage {
let filtered_content = tracker.exclude_chunk(id.clone(), content.clone());
tracker.add_chunk(id.clone(), filtered_content.clone());
Ok(vec![BusterChatMessage::Text {
id: id.clone(),
message_type: "text".to_string(),
message: None,
message_chunk: Some(content),
message_chunk: Some(filtered_content),
is_final_message: Some(false),
};
println!(
"MESSAGE_STREAM: Created in-progress text message: {:?}",
container
);
Ok(vec![container])
}])
}
MessageProgress::Complete => {
let container = BusterChatMessage {
tracker.clear_chunk(id.clone());
Ok(vec![BusterChatMessage::Text {
id: id.clone(),
message_type: "text".to_string(),
message: Some(content),
message_chunk: None,
is_final_message: Some(true),
};
println!(
"MESSAGE_STREAM: Created complete text message: {:?}",
container
);
Ok(vec![container])
}])
}
}
}
@ -775,12 +888,30 @@ fn transform_tool_message(
};
// Convert BusterReasoningMessage to BusterReasoningMessageContainer
let tracker = get_chunk_tracker();
let reasoning_containers = messages
.into_iter()
.map(|reasoning| BusterReasoningMessageContainer {
reasoning,
chat_id,
message_id,
.map(|reasoning| {
let updated_reasoning = if let BusterReasoningMessage::Text(mut text) = reasoning {
if let Some(chunk) = text.message_chunk.clone() {
let filtered_content = tracker.exclude_chunk(text.id.clone(), chunk.clone());
println!("MESSAGE_STREAM: Filtered content: {:?}", filtered_content);
tracker.add_chunk(text.id.clone(), filtered_content.clone());
text.message_chunk = Some(filtered_content);
}
if text.status == Some("completed".to_string()) {
tracker.clear_chunk(text.id.clone());
}
BusterReasoningMessage::Text(text)
} else {
reasoning
};
BusterReasoningMessageContainer {
reasoning: updated_reasoning,
chat_id,
message_id,
}
})
.collect();
@ -985,6 +1116,7 @@ fn transform_assistant_tool_message(
}
let mut all_messages = Vec::new();
let tracker = get_chunk_tracker();
// Process each tool call individually
for tool_call in &tool_calls {
@ -1041,10 +1173,27 @@ fn transform_assistant_tool_message(
// Convert BusterReasoningMessage to BusterReasoningMessageContainer
let containers: Vec<BusterReasoningMessageContainer> = messages
.into_iter()
.map(|reasoning| BusterReasoningMessageContainer {
reasoning,
chat_id,
message_id,
.map(|reasoning| {
let updated_reasoning = if let BusterReasoningMessage::Text(mut text) = reasoning {
if let Some(chunk) = text.message_chunk.clone() {
let filtered_content =
tracker.exclude_chunk(text.id.clone(), chunk.clone());
tracker.add_chunk(text.id.clone(), chunk);
text.message_chunk = Some(filtered_content);
}
if text.status == Some("completed".to_string()) {
tracker.clear_chunk(text.id.clone());
}
BusterReasoningMessage::Text(text)
} else {
reasoning
};
BusterReasoningMessageContainer {
reasoning: updated_reasoning,
chat_id,
message_id,
}
})
.collect();
@ -1409,18 +1558,18 @@ async fn initialize_chat(
let mut existing_chat = get_chat_handler(&existing_chat_id, &user.id).await?;
// Add new message to existing chat
existing_chat.messages.push(ChatMessage {
id: message_id,
request_message: ChatUserMessage {
existing_chat.messages.push(ChatMessage::new_with_messages(
message_id,
ChatUserMessage {
request: request.prompt.clone(),
sender_id: user.id.clone(),
sender_name: user.name.clone().unwrap_or_default(),
sender_avatar: None,
},
response_messages: vec![],
reasoning: vec![],
created_at: Utc::now().to_string(),
});
Vec::new(),
Vec::new(),
None,
));
Ok((existing_chat_id, message_id, existing_chat))
} else {
@ -1441,18 +1590,18 @@ async fn initialize_chat(
id: chat_id,
title: request.prompt.clone(),
is_favorited: false,
messages: vec![ChatMessage {
id: message_id,
request_message: ChatUserMessage {
messages: vec![ChatMessage::new_with_messages(
message_id,
ChatUserMessage {
request: request.prompt.clone(),
sender_id: user.id.clone(),
sender_name: user.name.clone().unwrap_or_default(),
sender_avatar: None,
},
response_messages: vec![],
reasoning: vec![],
created_at: Utc::now().to_string(),
}],
Vec::new(),
Vec::new(),
None,
)],
created_at: Utc::now().to_string(),
updated_at: Utc::now().to_string(),
created_by: user.id.to_string(),

View File

@ -3,8 +3,8 @@ use serde_json::Value;
use uuid::Uuid;
use super::post_chat_handler::{
BusterFileLine, BusterReasoningFile, BusterReasoningMessage, BusterReasoningPill,
BusterReasoningText, BusterThoughtPill, BusterThoughtPillContainer, BusterFileContent,
BusterReasoningFile, BusterReasoningMessage, BusterReasoningPill,
BusterReasoningText, BusterThoughtPill, BusterThoughtPillContainer, BusterFile, BusterFileContent,
};
pub struct StreamingParser {
@ -245,7 +245,8 @@ impl StreamingParser {
file_type: String,
) -> Result<Option<BusterReasoningMessage>> {
if let Some(files) = value.get("files").and_then(Value::as_array) {
let mut file_contents = Vec::new();
let mut files_map = std::collections::HashMap::new();
let mut file_ids = Vec::new();
for file in files {
if let Some(file_obj) = file.as_object() {
@ -259,37 +260,38 @@ impl StreamingParser {
.and_then(Value::as_str)
.unwrap_or("");
let mut current_lines = Vec::new();
for (i, line) in yml_content.lines().enumerate() {
current_lines.push(BusterFileLine {
line_number: i + 1,
text: line.to_string(),
modified: Some(false),
});
}
let file_id = Uuid::new_v4().to_string();
file_contents.push(BusterFileContent {
id: Uuid::new_v4().to_string(),
let buster_file = BusterFile {
id: file_id.clone(),
file_type: file_type.clone(),
file_name: name.to_string(),
version_number: 1,
version_id: Uuid::new_v4().to_string(),
status: "loading".to_string(),
content: current_lines,
file: BusterFileContent {
text: Some(yml_content.to_string()),
text_chunk: None,
modifided: None,
},
metadata: None,
});
};
file_ids.push(file_id.clone());
files_map.insert(file_id, buster_file);
}
}
}
if !file_contents.is_empty() {
if !files_map.is_empty() {
return Ok(Some(BusterReasoningMessage::File(BusterReasoningFile {
id,
message_type: "files".to_string(),
title: format!("Creating {} files...", file_type),
secondary_title: String::new(),
status: "loading".to_string(),
files: file_contents,
file_ids,
files: files_map,
})));
}
}

View File

@ -16,3 +16,4 @@ pub struct ChatWithMessages {
pub created_by_name: String,
pub created_by_avatar: Option<String>,
}

View File

@ -1,3 +1,6 @@
use std::collections::HashMap;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
@ -6,9 +9,14 @@ use uuid::Uuid;
pub struct ChatMessage {
pub id: Uuid,
pub request_message: ChatUserMessage,
pub response_messages: Vec<Value>,
pub reasoning: Vec<Value>,
pub response_message_ids: Vec<String>,
#[serde(default)]
pub response_messages: HashMap<String, Value>,
pub reasoning_message_ids: Vec<String>,
#[serde(default)]
pub reasoning_messages: HashMap<String, Value>,
pub created_at: String,
pub final_reasoning_message: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -18,3 +26,73 @@ pub struct ChatUserMessage {
pub sender_name: String,
pub sender_avatar: Option<String>,
}
impl ChatMessage {
pub fn new(
request: String,
sender_id: Uuid,
sender_name: String,
sender_avatar: Option<String>,
) -> Self {
Self {
id: Uuid::new_v4(),
request_message: ChatUserMessage {
request,
sender_id,
sender_name,
sender_avatar,
},
response_message_ids: Vec::new(),
response_messages: HashMap::new(),
reasoning_message_ids: Vec::new(),
reasoning_messages: HashMap::new(),
created_at: Utc::now().to_rfc3339(),
final_reasoning_message: None,
}
}
pub fn new_with_messages(
id: Uuid,
request_message: ChatUserMessage,
response_messages: Vec<Value>,
reasoning_messages: Vec<Value>,
final_reasoning_message: Option<String>,
) -> Self {
let response_message_ids: Vec<String> = response_messages
.iter()
.filter_map(|msg| msg.get("id").and_then(|id| id.as_str()).map(String::from))
.collect();
let reasoning_message_ids: Vec<String> = reasoning_messages
.iter()
.filter_map(|msg| msg.get("id").and_then(|id| id.as_str()).map(String::from))
.collect();
let response_messages_map: HashMap<String, Value> = response_messages
.into_iter()
.filter_map(|msg| {
let id = msg.get("id").and_then(|id| id.as_str())?;
Some((id.to_string(), msg))
})
.collect();
let reasoning_messages_map: HashMap<String, Value> = reasoning_messages
.into_iter()
.filter_map(|msg| {
let id = msg.get("id").and_then(|id| id.as_str())?;
Some((id.to_string(), msg))
})
.collect();
Self {
id,
request_message,
response_message_ids,
response_messages: response_messages_map,
reasoning_message_ids,
reasoning_messages: reasoning_messages_map,
created_at: Utc::now().to_rfc3339(),
final_reasoning_message,
}
}
}