save files without sending

This commit is contained in:
dal 2025-03-05 13:58:14 -07:00
parent d773f85029
commit c60522b383
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 130 additions and 148 deletions

View File

@ -84,7 +84,7 @@ impl ChunkTracker {
complete_text: String::new(),
last_seen_content: String::new(),
});
// Calculate the delta by finding what's new since last_seen_content
let delta = if state.last_seen_content.is_empty() {
// First chunk, use it as is
@ -102,13 +102,13 @@ impl ChunkTracker {
}
}
};
// Update tracking state only if we found new content
if !delta.is_empty() {
state.complete_text.push_str(&delta);
state.last_seen_content = new_chunk;
}
delta
} else {
new_chunk
@ -116,10 +116,11 @@ impl ChunkTracker {
}
pub fn get_complete_text(&self, chunk_id: String) -> Option<String> {
self.chunks
.lock()
.ok()
.and_then(|chunks| chunks.get(&chunk_id).map(|state| state.complete_text.clone()))
self.chunks.lock().ok().and_then(|chunks| {
chunks
.get(&chunk_id)
.map(|state| state.complete_text.clone())
})
}
pub fn clear_chunk(&self, chunk_id: String) {
@ -248,16 +249,16 @@ pub async fn post_chat_handler(
// Always transform the message
match transform_message(&chat_id, &message_id, msg, tx.as_ref()).await {
Ok((containers, event)) => {
Ok(containers) => {
// Store all transformed containers
for container in containers.clone() {
for (container, _) in containers.clone() {
all_transformed_containers.push(container.clone());
}
// If we have a tx channel, send the transformed messages
if let Some(tx) = &tx {
for container in containers {
if tx.send(Ok((container, event.clone()))).await.is_err() {
for (container, thread_event) in containers {
if tx.send(Ok((container, thread_event))).await.is_err() {
// Client disconnected, but continue processing messages
tracing::warn!(
"Client disconnected, but continuing to process messages"
@ -313,7 +314,7 @@ pub async fn post_chat_handler(
reasoning_messages.clone(),
Some(format!("Reasoned for {} seconds", reasoning_duration).to_string()),
);
chat_with_messages.update_message(message);
// Create and store message in the database with final state
@ -339,8 +340,14 @@ pub async fn post_chat_handler(
.await?;
// First process completed files (database updates only)
let _ =
process_completed_files(&mut conn, &db_message, &all_messages, &user_org_id, &user.id).await?;
let _ = process_completed_files(
&mut conn,
&db_message,
&all_messages,
&user_org_id,
&user.id,
)
.await?;
// Then send text response messages
if let Some(tx) = &tx {
@ -454,7 +461,7 @@ async fn process_completed_files(
) -> Result<()> {
let mut transformed_messages = Vec::new();
for msg in messages {
if let Ok((containers, _)) =
if let Ok(containers) =
transform_message(&message.chat_id, &message.id, msg.clone(), None).await
{
transformed_messages.extend(containers);
@ -462,7 +469,7 @@ async fn process_completed_files(
}
// Process files for database updates only
for container in transformed_messages {
for (container, _) in transformed_messages {
match container {
BusterContainer::ReasoningMessage(msg) => match &msg.reasoning {
BusterReasoningMessage::File(file) if file.message_type == "files" => {
@ -650,7 +657,7 @@ pub async fn transform_message(
message_id: &Uuid,
message: AgentMessage,
tx: Option<&mpsc::Sender<Result<(BusterContainer, ThreadEvent)>>>,
) -> Result<(Vec<BusterContainer>, ThreadEvent)> {
) -> Result<Vec<(BusterContainer, ThreadEvent)>> {
println!("MESSAGE_STREAM: Transforming message: {:?}", message);
match message {
@ -688,7 +695,7 @@ pub async fn transform_message(
vec![]
}
};
containers.extend(chat_messages);
containers.extend(chat_messages.into_iter().map(|container| (container, ThreadEvent::GeneratingResponseMessage)));
// Add the "Finished reasoning" message if we're just starting
if initial {
@ -709,24 +716,14 @@ pub async fn transform_message(
message_id: *message_id,
});
// Send the finished reasoning message separately
if let Some(tx) = tx {
if let Err(e) = tx
.send(Ok((
reasoning_container,
ThreadEvent::GeneratingReasoningMessage,
)))
.await
{
tracing::warn!("Failed to send finished reasoning message: {:?}", e);
}
}
containers.push((
reasoning_container,
ThreadEvent::GeneratingResponseMessage,
));
}
return Ok((containers, ThreadEvent::GeneratingResponseMessage));
}
if let Some(tool_calls) = tool_calls {
Ok(containers)
} else if let Some(tool_calls) = tool_calls {
let mut containers = Vec::new();
// Transform tool messages
@ -740,50 +737,54 @@ pub async fn transform_message(
Ok(messages) => {
for reasoning_container in messages {
// If this is a completed file reasoning message, send the file response separately
if let BusterReasoningMessage::File(ref file) =
reasoning_container.reasoning
{
if file.status == "completed" && file.message_type == "files" {
// For each completed file, create and send a file response message
for (file_id, file_content) in &file.files {
let response_message = BusterChatMessage::File {
id: file_content.id.clone(),
file_type: file_content.file_type.clone(),
file_name: file_content.file_name.clone(),
version_number: file_content.version_number,
version_id: file_content.version_id.clone(),
filter_version_id: None,
metadata: Some(vec![BusterChatResponseFileMetadata {
status: "completed".to_string(),
message: format!(
"File {} completed",
file_content.file_name
),
timestamp: Some(Utc::now().timestamp()),
}]),
};
match &reasoning_container {
BusterReasoningMessage::File(file) => {
if file.status == "completed" && file.message_type == "files" {
// For each completed file, create and send a file response message
for (file_id, file_content) in &file.files {
let response_message = BusterChatMessage::File {
id: file_content.id.clone(),
file_type: file_content.file_type.clone(),
file_name: file_content.file_name.clone(),
version_number: file_content.version_number,
version_id: file_content.version_id.clone(),
filter_version_id: None,
metadata: Some(vec![BusterChatResponseFileMetadata {
status: "completed".to_string(),
message: format!(
"File {} completed",
file_content.file_name
),
timestamp: Some(Utc::now().timestamp()),
}]),
};
let file_container = BusterContainer::ChatMessage(
BusterChatMessageContainer {
response_message,
chat_id: *chat_id,
message_id: *message_id,
},
);
let file_container = BusterContainer::ChatMessage(
BusterChatMessageContainer {
response_message,
chat_id: *chat_id,
message_id: *message_id,
},
);
containers.push((
file_container.clone(),
ThreadEvent::GeneratingResponseMessage,
));
// Send file response message separately with GeneratingResponseMessage event
if let Some(tx) = tx {
let _ = tx
.send(Ok((
file_container.clone(),
ThreadEvent::GeneratingResponseMessage,
)))
.await;
}
}
}
_ => {}
}
containers.push(BusterContainer::ReasoningMessage(reasoning_container));
containers.push((
BusterContainer::ReasoningMessage(BusterReasoningMessageContainer {
reasoning: reasoning_container,
chat_id: *chat_id,
message_id: *message_id,
}),
ThreadEvent::GeneratingReasoningMessage,
));
}
}
Err(e) => {
@ -795,10 +796,10 @@ pub async fn transform_message(
}
};
return Ok((containers, ThreadEvent::GeneratingReasoningMessage));
Ok(containers)
} else {
Ok(vec![])
}
Ok((vec![], ThreadEvent::GeneratingResponseMessage))
}
AgentMessage::Tool {
id,
@ -808,9 +809,8 @@ pub async fn transform_message(
progress,
} => {
if let Some(name) = name {
let name_str = name.clone(); // Clone here to use in println later
let name_str = name.clone();
// Use tool_call_id directly as it's already a String
let messages = match transform_tool_message(
tool_call_id,
name,
@ -820,21 +820,28 @@ pub async fn transform_message(
) {
Ok(messages) => messages
.into_iter()
.map(BusterContainer::ReasoningMessage)
.map(|container| (
BusterContainer::ReasoningMessage(BusterReasoningMessageContainer {
reasoning: container,
chat_id: *chat_id,
message_id: *message_id,
}),
ThreadEvent::GeneratingReasoningMessage,
))
.collect(),
Err(e) => {
tracing::warn!("Error transforming tool message '{}': {:?}", name_str, e);
println!("MESSAGE_STREAM: Error transforming tool message: {:?}", e);
vec![] // Return empty vec but warn about the error
vec![]
}
};
return Ok((messages, ThreadEvent::GeneratingReasoningMessage));
Ok(messages)
} else {
Ok(vec![])
}
Ok((vec![], ThreadEvent::GeneratingResponseMessage)) // Return empty vec instead of error
}
_ => Ok((vec![], ThreadEvent::GeneratingResponseMessage)), // Return empty vec instead of error
_ => Ok(vec![]),
}
}
@ -858,7 +865,9 @@ fn transform_text_message(
}])
}
MessageProgress::Complete => {
let complete_text = tracker.get_complete_text(id.clone()).unwrap_or(content.clone());
let complete_text = tracker
.get_complete_text(id.clone())
.unwrap_or(content.clone());
tracker.clear_chunk(id.clone());
Ok(vec![BusterChatMessage::Text {
id: id.clone(),
@ -877,7 +886,7 @@ fn transform_tool_message(
content: String,
chat_id: Uuid,
message_id: Uuid,
) -> Result<Vec<BusterReasoningMessageContainer>> {
) -> Result<Vec<BusterReasoningMessage>> {
// Use required ID (tool call ID) for all function calls
let messages = match name.as_str() {
"search_data_catalog" => tool_data_catalog_search(id.clone(), content)?,
@ -889,34 +898,7 @@ fn transform_tool_message(
_ => return Err(anyhow::anyhow!("Unknown tool name: {}", name)),
};
// Convert BusterReasoningMessage to BusterReasoningMessageContainer
let tracker = get_chunk_tracker();
let reasoning_containers = messages
.into_iter()
.map(|reasoning| {
let updated_reasoning = if let BusterReasoningMessage::Text(mut text) = reasoning {
if let Some(chunk) = text.message_chunk.clone() {
let filtered_content = tracker.add_chunk(text.id.clone(), chunk.clone());
println!("MESSAGE_STREAM: Filtered content: {:?}", filtered_content);
text.message_chunk = Some(filtered_content);
}
if text.status == Some("completed".to_string()) {
tracker.clear_chunk(text.id.clone());
}
BusterReasoningMessage::Text(text)
} else {
reasoning
};
BusterReasoningMessageContainer {
reasoning: updated_reasoning,
chat_id,
message_id,
}
})
.collect();
Ok(reasoning_containers)
Ok(messages)
}
// Update tool_create_metrics to require ID
@ -1095,7 +1077,7 @@ fn transform_assistant_tool_message(
initial: bool,
chat_id: Uuid,
message_id: Uuid,
) -> Result<Vec<BusterReasoningMessageContainer>> {
) -> Result<Vec<BusterReasoningMessage>> {
let mut all_messages = Vec::new();
let tracker = get_chunk_tracker();
@ -1142,7 +1124,7 @@ fn transform_assistant_tool_message(
_ => vec![],
};
let containers: Vec<BusterReasoningMessageContainer> = messages
let containers: Vec<BusterReasoningMessage> = messages
.into_iter()
.map(|reasoning| {
let updated_reasoning = match reasoning {
@ -1151,13 +1133,13 @@ fn transform_assistant_tool_message(
println!("CHUNK DEBUG [{}] Before filtering:", text.id);
println!(" Incoming chunk length: {}", chunk.len());
println!(" Incoming chunk: {}", chunk);
let delta = tracker.add_chunk(text.id.clone(), chunk);
println!("CHUNK DEBUG [{}] After filtering:", text.id);
println!(" Delta content length: {}", delta.len());
println!(" Delta content: {}", delta);
if !delta.is_empty() {
text.message_chunk = Some(delta);
text.message = None; // Clear message field while streaming
@ -1166,52 +1148,54 @@ fn transform_assistant_tool_message(
return None;
}
}
if text.status == Some("completed".to_string()) {
println!("CHUNK DEBUG [{}] Completing message", text.id);
// For completed messages, either use accumulated text or the final message
text.message = tracker.get_complete_text(text.id.clone())
text.message = tracker
.get_complete_text(text.id.clone())
.or(text.message)
.or(text.message_chunk.clone());
text.message_chunk = None;
tracker.clear_chunk(text.id.clone());
}
Some(BusterReasoningMessage::Text(text))
}
BusterReasoningMessage::File(mut file) => {
let mut has_updates = false;
let mut updated_files = std::collections::HashMap::new();
// Process each file's chunks
for (file_id, file_content) in file.files.iter() {
// Generate a consistent temporary ID for files during creation
// This ensures the same file gets the same ID throughout the creation process
let temp_file_id = if file.message_type == "files" && file.status != "completed" {
// For files being created, use a hash of the file name as a temporary ID
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
file_content.file_name.hash(&mut hasher);
format!("temp_{}", hasher.finish())
} else {
file_id.clone()
};
let temp_file_id =
if file.message_type == "files" && file.status != "completed" {
// For files being created, use a hash of the file name as a temporary ID
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
file_content.file_name.hash(&mut hasher);
format!("temp_{}", hasher.finish())
} else {
file_id.clone()
};
// Use consistent ID for chunk tracking
let chunk_id = format!("{}_{}", file.id, file_content.file_name);
if let Some(chunk) = &file_content.file.text_chunk {
println!("FILE CHUNK DEBUG [{}] Before filtering:", chunk_id);
println!(" Incoming chunk length: {}", chunk.len());
println!(" Incoming chunk: {}", chunk);
let delta = tracker.add_chunk(chunk_id.clone(), chunk.clone());
println!("FILE CHUNK DEBUG [{}] After filtering:", chunk_id);
println!(" Delta content length: {}", delta.len());
println!(" Delta content: {}", delta);
if !delta.is_empty() {
// Only include files that have new content
let mut updated_content = file_content.clone();
@ -1223,24 +1207,26 @@ fn transform_assistant_tool_message(
}
}
}
if file.status == "completed" {
// When completed, send all files with their complete text
for (file_id, file_content) in file.files.iter() {
let chunk_id = format!("{}_{}", file.id, file_content.file_name);
let complete_text = tracker.get_complete_text(chunk_id.clone())
.unwrap_or_else(|| file_content.file.text_chunk.clone().unwrap_or_default());
let complete_text =
tracker.get_complete_text(chunk_id.clone()).unwrap_or_else(
|| file_content.file.text_chunk.clone().unwrap_or_default(),
);
let mut completed_content = file_content.clone();
completed_content.file.text = Some(complete_text);
completed_content.file.text_chunk = None;
updated_files.insert(file_id.clone(), completed_content);
tracker.clear_chunk(chunk_id);
}
has_updates = true;
}
if has_updates {
let mut updated_file = file.clone();
updated_file.files = updated_files;
@ -1252,11 +1238,7 @@ fn transform_assistant_tool_message(
other => Some(other),
};
updated_reasoning.map(|reasoning| BusterReasoningMessageContainer {
reasoning,
chat_id,
message_id,
})
updated_reasoning.map(|reasoning| reasoning)
})
.filter_map(|container| container)
.collect();