diff --git a/api/src/routes/ws/threads_and_messages/post_thread/agent_message_transformer.rs b/api/src/routes/ws/threads_and_messages/post_thread/agent_message_transformer.rs index 7643e8371..e7ad3c54d 100644 --- a/api/src/routes/ws/threads_and_messages/post_thread/agent_message_transformer.rs +++ b/api/src/routes/ws/threads_and_messages/post_thread/agent_message_transformer.rs @@ -886,7 +886,7 @@ fn tool_create_file( } messages.push(BusterThreadMessage::File(BusterFileMessage { - id: Uuid::new_v4().to_string(), + id: name.clone(), message_type: "file".to_string(), file_type, file_name: name, diff --git a/api/src/routes/ws/threads_and_messages/post_thread/agent_thread.rs b/api/src/routes/ws/threads_and_messages/post_thread/agent_thread.rs index 343b1c928..b07ff07e6 100644 --- a/api/src/routes/ws/threads_and_messages/post_thread/agent_thread.rs +++ b/api/src/routes/ws/threads_and_messages/post_thread/agent_thread.rs @@ -115,11 +115,8 @@ impl AgentThreadHandler { ) { let subscription = user_id.to_string(); - let message_id = Uuid::new_v4().to_string(); - while let Some(msg_result) = rx.recv().await { - if let Ok(mut msg) = msg_result { - msg.set_id(message_id.clone()); + if let Ok(msg) = msg_result { match transform_message(msg) { Ok(transformed_messages) => { for transformed in transformed_messages { diff --git a/api/src/utils/agent/agent.rs b/api/src/utils/agent/agent.rs index 128206181..4320a598e 100644 --- a/api/src/utils/agent/agent.rs +++ b/api/src/utils/agent/agent.rs @@ -90,6 +90,7 @@ impl Agent { ) -> Result { if recursion_depth >= 30 { return Ok(Message::assistant( + Some("max_recursion_depth_message".to_string()), Some("I apologize, but I've reached the maximum number of actions (30). Please try breaking your request into smaller parts.".to_string()), None, None, @@ -133,7 +134,7 @@ impl Agent { let mut tool_thread = thread.clone(); tool_thread .messages - .push(Message::assistant(Some(initial_content), None, None)); + .push(Message::assistant(None, Some(initial_content), None, None)); // Create the tool-enabled request let request = ChatCompletionRequest { @@ -158,7 +159,7 @@ impl Agent { content, tool_calls, .. - } => Message::assistant(content.clone(), tool_calls.clone(), None), + } => Message::assistant(None, content.clone(), tool_calls.clone(), None), _ => return Err(anyhow::anyhow!("Expected assistant message from LLM")), }; @@ -184,6 +185,7 @@ impl Agent { let result = tool.execute(tool_call).await?; let result_str = serde_json::to_string(&result)?; results.push(Message::tool( + None, result_str, tool_call.id.clone(), Some(tool_call.function.name.clone()), @@ -233,6 +235,7 @@ impl Agent { ) -> Result<()> { if recursion_depth >= 30 { let limit_message = Message::assistant( + Some("max_recursion_depth_message".to_string()), Some("I apologize, but I've reached the maximum number of actions (30). Please try breaking your request into smaller parts.".to_string()), None, None, @@ -266,13 +269,19 @@ impl Agent { // Get streaming response for initial thoughts let mut initial_stream = llm_client.stream_chat_completion(initial_request).await?; - let mut initial_message = Message::assistant(Some(String::new()), None, None); - let mut has_started = false; + let mut initial_message = Message::assistant( + None, + Some(String::new()), + None, + None, + ); // Process initial stream chunks while let Some(chunk_result) = initial_stream.recv().await { match chunk_result { Ok(chunk) => { + initial_message.set_id(chunk.id.clone()); + let delta = &chunk.choices[0].delta; // Handle content updates - send delta directly @@ -280,6 +289,7 @@ impl Agent { // Send the delta chunk immediately with InProgress let _ = tx .send(Ok(Message::assistant( + Some("initial_message".to_string()), Some(content.clone()), None, Some(MessageProgress::InProgress), @@ -315,6 +325,7 @@ impl Agent { if !initial_content.trim().is_empty() { let _ = tx .send(Ok(Message::assistant( + Some("initial_message".to_string()), Some(initial_content.clone()), None, Some(MessageProgress::Complete), @@ -324,9 +335,12 @@ impl Agent { // Create a new thread with the initial response let mut tool_thread = thread.clone(); - tool_thread - .messages - .push(Message::assistant(Some(initial_content), None, None)); + tool_thread.messages.push(Message::assistant( + Some("initial_message".to_string()), + Some(initial_content.clone()), + None, + None, + )); // Create the tool-enabled request let request = ChatCompletionRequest { @@ -340,7 +354,7 @@ impl Agent { // Get streaming response let mut stream = llm_client.stream_chat_completion(request).await?; - let mut current_message = Message::assistant(Some(String::new()), None, None); + let mut current_message = Message::assistant(None, Some(String::new()), None, None); let mut current_pending_tool: Option = None; let mut has_tool_calls = false; let mut tool_results = Vec::new(); @@ -349,6 +363,8 @@ impl Agent { while let Some(chunk_result) = stream.recv().await { match chunk_result { Ok(chunk) => { + current_message.set_id(chunk.id.clone()); + let delta = &chunk.choices[0].delta; // Check for tool call completion @@ -361,6 +377,7 @@ impl Agent { // Create and preserve the assistant message with the tool call let assistant_tool_message = Message::assistant( + Some(chunk.id.clone()), None, Some(vec![tool_call.clone()]), Some(MessageProgress::Complete), @@ -375,6 +392,7 @@ impl Agent { let result_str = serde_json::to_string(&result)?; let tool_result = Message::tool( + Some(chunk.id.clone()), result_str, tool_call.id.clone(), Some(tool_call.function.name.clone()), @@ -390,6 +408,7 @@ impl Agent { let error_msg = format!("Tool execution failed: {:?}", e); let tool_error = Message::tool( + Some(chunk.id.clone()), error_msg, tool_call.id.clone(), Some(tool_call.function.name.clone()), @@ -422,6 +441,7 @@ impl Agent { } let _ = tx .send(Ok(Message::assistant( + Some(chunk.id.clone()), Some(content.clone()), None, Some(MessageProgress::InProgress), @@ -460,6 +480,7 @@ impl Agent { let _ = tx .send(Ok(Message::assistant( + Some(chunk.id.clone()), None, Some(vec![temp_tool_call]), Some(MessageProgress::InProgress), @@ -476,10 +497,12 @@ impl Agent { if let Message::Assistant { content: Some(content), .. - } = ¤t_message { + } = ¤t_message + { if !content.trim().is_empty() { let _ = tx .send(Ok(Message::assistant( + Some(chunk.id.clone()), Some(content.clone()), None, Some(MessageProgress::Complete), diff --git a/api/src/utils/clients/ai/litellm/types.rs b/api/src/utils/clients/ai/litellm/types.rs index 86a17ca43..2a2f11811 100644 --- a/api/src/utils/clients/ai/litellm/types.rs +++ b/api/src/utils/clients/ai/litellm/types.rs @@ -155,12 +155,13 @@ impl Message { } pub fn assistant( + id: Option, content: Option, tool_calls: Option>, progress: Option, ) -> Self { Self::Assistant { - id: None, + id, content, name: None, tool_calls, @@ -169,13 +170,14 @@ impl Message { } pub fn tool( + id: Option, content: impl Into, tool_call_id: impl Into, name: Option, progress: Option, ) -> Self { Self::Tool { - id: None, + id, content: content.into(), tool_call_id: tool_call_id.into(), name, @@ -476,6 +478,7 @@ mod tests { Some("\n\nHello there, how may I assist you today?".to_string()), None, None, + None, ), logprobs: None, finish_reason: Some("stop".to_string()), @@ -617,7 +620,7 @@ mod tests { choices: vec![Choice { finish_reason: Some("length".to_string()), index: 0, - message: Message::assistant(Some("".to_string()), None, None), + message: Message::assistant(Some("".to_string()), None, None, None), delta: None, logprobs: None, }], @@ -894,6 +897,7 @@ mod tests { choices: vec![Choice { index: 0, message: Message::assistant( + None, None, Some(vec![ToolCall { id: "call_abc123".to_string(),