ok things are working, but more tweaks needed.

This commit is contained in:
dal 2025-03-03 10:21:11 -07:00
parent 5de4a3961a
commit 03de39ac82
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
4 changed files with 424 additions and 578 deletions

View File

@ -353,6 +353,8 @@ impl Agent {
let mut content_buffer = String::new();
let mut message_id: Option<String> = None;
let mut is_complete = false;
// Flag to track if we've sent the first message
let mut first_message_sent = false;
while let Some(chunk_result) = stream_rx.recv().await {
match chunk_result {
@ -367,17 +369,19 @@ impl Agent {
if let Some(content) = &delta.content {
content_buffer.push_str(content);
// Stream the content update
// Stream the content update with initial=true for the first message only
let partial_message = AgentMessage::assistant(
message_id.clone(),
Some(content_buffer.clone()),
None,
Some(MessageProgress::InProgress),
Some(false),
Some(!first_message_sent), // Set initial=true only for the first message
Some(self.name.clone()),
);
self.get_stream_sender().await.send(Ok(partial_message))?;
// Mark that we've sent the first message
first_message_sent = true;
}
// Process tool calls if present
@ -423,11 +427,13 @@ impl Agent {
},
Some(tool_calls_vec),
Some(MessageProgress::InProgress),
Some(false),
Some(!first_message_sent), // Set initial=true only for the first message
Some(self.name.clone()),
);
self.get_stream_sender().await.send(Ok(partial_message))?;
// Mark that we've sent the first message
first_message_sent = true;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,7 @@ impl StreamingParser {
}
}
pub fn process_chunk(&mut self, chunk: &str) -> Result<Option<BusterChatContainer>> {
pub fn process_chunk(&mut self, id: String, chunk: &str) -> Result<Option<BusterChatContainer>> {
// Add new chunk to buffer
self.buffer.push_str(chunk);
@ -87,7 +87,7 @@ impl StreamingParser {
let has_yml_content = last_file.get("yml_content").is_some();
if has_name && has_file_type && has_yml_content {
return self.convert_to_message(value);
return self.convert_to_message(id, value);
}
}
}
@ -148,7 +148,7 @@ impl StreamingParser {
processed
}
fn convert_to_message(&self, value: Value) -> Result<Option<BusterChatContainer>> {
fn convert_to_message(&self, id: String, value: Value) -> Result<Option<BusterChatContainer>> {
if let Some(files) = value.get("files").and_then(Value::as_array) {
if let Some(last_file) = files.last().and_then(Value::as_object) {
let name = last_file.get("name").and_then(Value::as_str).unwrap_or("");
@ -171,7 +171,7 @@ impl StreamingParser {
}
return Ok(Some(BusterChatContainer::File(BusterFileMessage {
id: name.to_string(),
id,
message_type: "file".to_string(),
file_type: file_type.to_string(),
file_name: name.to_string(),

View File

@ -1,15 +1,15 @@
use std::sync::Arc;
use anyhow::Result;
use handlers::chats::post_chat_handler;
use handlers::chats::post_chat_handler::ChatCreateNewChat;
use handlers::chats::post_chat_handler::{self, ThreadEvent};
use handlers::chats::types::ChatWithMessages;
use tokio::sync::mpsc;
use crate::{
database::models::User,
routes::ws::{
threads_and_messages::threads_router::{ThreadEvent, ThreadRoute},
threads_and_messages::threads_router::{ThreadEvent as WSThreadEvent, ThreadRoute},
ws::{SubscriptionRwLock, WsEvent, WsResponseMessage, WsSendMethod},
ws_router::WsRoutes,
ws_utils::send_ws_message,
@ -25,32 +25,47 @@ pub async fn post_thread(
) -> Result<()> {
let (tx, mut rx) = mpsc::channel(1000);
// Call the shared handler
post_chat_handler::post_chat_handler(request, user.clone(), Some(tx)).await?;
let user_id = user.id.to_string();
while let Some(result) = rx.recv().await {
match result {
Ok(chat_with_messages) => {
println!("MESSAGE SHOULD BE SENT: {:?}", chat_with_messages);
tokio::spawn(async move {
while let Some(result) = rx.recv().await {
match result {
Ok((message, event)) => {
println!("MESSAGE SHOULD BE SENT: {:?}", message);
let response = WsResponseMessage::new_no_user(
WsRoutes::Threads(ThreadRoute::Post),
WsEvent::Threads(ThreadEvent::InitializeChat),
&chat_with_messages,
None,
WsSendMethod::All,
);
let event = match event {
ThreadEvent::GeneratingResponseMessage => {
WsEvent::Threads(WSThreadEvent::GeneratingResponseMessage)
}
ThreadEvent::GeneratingReasoningMessage => {
WsEvent::Threads(WSThreadEvent::GeneratingReasoningMessage)
}
};
if let Err(e) = send_ws_message(&user.id.to_string(), &response).await {
tracing::error!("Failed to send websocket message: {}", e);
let response = WsResponseMessage::new_no_user(
WsRoutes::Threads(ThreadRoute::Post),
event,
&message,
None,
WsSendMethod::All,
);
if let Err(e) = send_ws_message(&user_id, &response).await {
tracing::error!("Failed to send websocket message: {}", e);
}
}
Err(err) => {
tracing::error!("Error in message stream: {:?}", err);
return Err(err);
}
}
Err(err) => {
tracing::error!("Error in message stream: {:?}", err);
return Err(err);
}
}
}
Ok(())
});
// Call the shared handler
post_chat_handler::post_chat_handler(request, user.clone(), Some(tx)).await?;
Ok(())
}
@ -59,7 +74,7 @@ pub async fn post_thread(
async fn send_ws_response(subscription: &str, chat_with_messages: &ChatWithMessages) -> Result<()> {
let response = WsResponseMessage::new_no_user(
WsRoutes::Threads(ThreadRoute::Post),
WsEvent::Threads(ThreadEvent::InitializeChat),
WsEvent::Threads(WSThreadEvent::InitializeChat),
chat_with_messages,
None,
WsSendMethod::All,