working for the most part. final details for the stream.

This commit is contained in:
dal 2025-03-03 11:30:17 -07:00
parent 9aa1cbbab7
commit b2c988527f
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
4 changed files with 588 additions and 592 deletions

View File

@ -286,7 +286,7 @@ impl Agent {
Some("shutdown_message".to_string()), Some("shutdown_message".to_string()),
Some("Processing interrupted due to shutdown signal".to_string()), Some("Processing interrupted due to shutdown signal".to_string()),
None, None,
None, MessageProgress::Complete,
None, None,
Some(agent_clone.name.clone()), Some(agent_clone.name.clone()),
)) ))
@ -314,7 +314,7 @@ impl Agent {
Some("max_recursion_depth_message".to_string()), Some("max_recursion_depth_message".to_string()),
Some("I apologize, but I've reached the maximum number of actions (30). Please try breaking your request into smaller parts.".to_string()), Some("I apologize, but I've reached the maximum number of actions (30). Please try breaking your request into smaller parts.".to_string()),
None, None,
None, MessageProgress::Complete,
None, None,
Some(self.name.clone()), Some(self.name.clone()),
); );
@ -374,7 +374,7 @@ impl Agent {
message_id.clone(), message_id.clone(),
Some(content_buffer.clone()), Some(content_buffer.clone()),
None, None,
Some(MessageProgress::InProgress), MessageProgress::InProgress,
Some(!first_message_sent), // Set initial=true only for the first message Some(!first_message_sent), // Set initial=true only for the first message
Some(self.name.clone()), Some(self.name.clone()),
); );
@ -426,7 +426,7 @@ impl Agent {
Some(content_buffer.clone()) Some(content_buffer.clone())
}, },
Some(tool_calls_vec), Some(tool_calls_vec),
Some(MessageProgress::InProgress), MessageProgress::InProgress,
Some(!first_message_sent), // Set initial=true only for the first message Some(!first_message_sent), // Set initial=true only for the first message
Some(self.name.clone()), Some(self.name.clone()),
); );
@ -466,7 +466,7 @@ impl Agent {
Some(content_buffer) Some(content_buffer)
}, },
final_tool_calls.clone(), final_tool_calls.clone(),
Some(MessageProgress::Complete), MessageProgress::Complete,
Some(false), Some(false),
Some(self.name.clone()), Some(self.name.clone()),
); );
@ -499,7 +499,7 @@ impl Agent {
result_str, result_str,
tool_call.id.clone(), tool_call.id.clone(),
Some(tool_call.function.name.clone()), Some(tool_call.function.name.clone()),
None, MessageProgress::Complete,
); );
// Broadcast the tool message as soon as we receive it // Broadcast the tool message as soon as we receive it
@ -656,7 +656,7 @@ mod tests {
content, content,
tool_id, tool_id,
Some(self.get_name()), Some(self.get_name()),
Some(progress), progress,
); );
self.agent.get_stream_sender().await.send(Ok(message))?; self.agent.get_stream_sender().await.send(Ok(message))?;
Ok(()) Ok(())

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,7 @@ use anyhow::Result;
use serde_json::Value; use serde_json::Value;
use uuid::Uuid; use uuid::Uuid;
use super::post_chat_handler::{BusterChatContainer, BusterFileLine, BusterReasoningFile}; use super::post_chat_handler::{BusterReasoningMessage, BusterFileLine, BusterReasoningFile, BusterReasoningPill, BusterThoughtPill, BusterThoughtPillContainer, BusterReasoningText};
pub struct StreamingParser { pub struct StreamingParser {
buffer: String, buffer: String,
@ -20,10 +20,77 @@ impl StreamingParser {
} }
} }
pub fn process_chunk(&mut self, id: String, chunk: &str) -> Result<Option<BusterChatContainer>> { // Main entry point to process chunks based on type
pub fn process_chunk(&mut self, id: String, chunk: &str) -> Result<Option<BusterReasoningMessage>> {
// Add new chunk to buffer // Add new chunk to buffer
self.buffer.push_str(chunk); self.buffer.push_str(chunk);
// Try to process as a plan first
if let Some(plan) = self.process_plan_chunk(id.clone())? {
return Ok(Some(plan));
}
// Then try to process as search data catalog
if let Some(search) = self.process_search_catalog_chunk(id.clone())? {
return Ok(Some(search));
}
// Finally try to process as a file
self.process_file_chunk(id)
}
// Process chunks meant for plan creation
pub fn process_plan_chunk(&mut self, id: String) -> Result<Option<BusterReasoningMessage>> {
// Complete any incomplete JSON structure
let processed_json = self.complete_json_structure(self.buffer.clone());
// Try to parse the JSON
if let Ok(value) = serde_json::from_str::<Value>(&processed_json) {
// Check if it's a plan structure (has plan_markdown key)
if let Some(plan_markdown) = value.get("plan_markdown").and_then(Value::as_str) {
// Return the plan as a BusterReasoningText
return Ok(Some(BusterReasoningMessage::Text(BusterReasoningText {
id,
reasoning_type: "text".to_string(),
title: "Generated Plan".to_string(),
secondary_title: Some("Plan details".to_string()),
message: Some(plan_markdown.to_string()),
message_chunk: None,
status: Some("loading".to_string()),
})));
}
}
Ok(None)
}
// Process chunks meant for search data catalog
pub fn process_search_catalog_chunk(&mut self, id: String) -> Result<Option<BusterReasoningMessage>> {
// Complete any incomplete JSON structure
let processed_json = self.complete_json_structure(self.buffer.clone());
// Try to parse the JSON
if let Ok(value) = serde_json::from_str::<Value>(&processed_json) {
// Check if it's a search requirements structure
if let Some(search_requirements) = value.get("search_requirements").and_then(Value::as_str) {
// Return the search requirements as a BusterReasoningText
return Ok(Some(BusterReasoningMessage::Text(BusterReasoningText {
id,
reasoning_type: "text".to_string(),
title: "Search Requirements".to_string(),
secondary_title: Some("Processing search...".to_string()),
message: Some(search_requirements.to_string()),
message_chunk: None,
status: Some("loading".to_string()),
})));
}
}
Ok(None)
}
// Process chunks meant for files (original implementation)
pub fn process_file_chunk(&mut self, id: String) -> Result<Option<BusterReasoningMessage>> {
// Extract and replace yml_content with placeholders // Extract and replace yml_content with placeholders
let mut yml_contents = Vec::new(); let mut yml_contents = Vec::new();
let mut positions = Vec::new(); let mut positions = Vec::new();
@ -78,25 +145,13 @@ impl StreamingParser {
} }
// Now check the structure after modifications // Now check the structure after modifications
if let Some(obj) = value.as_object() { return self.convert_file_to_message(id, value);
if let Some(files) = obj.get("files").and_then(Value::as_array) {
if let Some(last_file) = files.last().and_then(Value::as_object) {
let has_name = last_file.get("name").and_then(Value::as_str).is_some();
let has_file_type =
last_file.get("file_type").and_then(Value::as_str).is_some();
let has_yml_content = last_file.get("yml_content").is_some();
if has_name && has_file_type && has_yml_content {
return self.convert_to_message(id, value);
}
}
}
}
} }
Ok(None) Ok(None)
} }
// Helper method to complete JSON structure (shared functionality)
fn complete_json_structure(&self, json: String) -> String { fn complete_json_structure(&self, json: String) -> String {
let mut processed = String::with_capacity(json.len()); let mut processed = String::with_capacity(json.len());
let mut nesting_stack = Vec::new(); let mut nesting_stack = Vec::new();
@ -144,22 +199,21 @@ impl StreamingParser {
} }
} }
println!("complete_json_structure: {:?}", processed);
processed processed
} }
fn convert_to_message(&self, id: String, value: Value) -> Result<Option<BusterChatContainer>> { // Helper method to convert file JSON to message
fn convert_file_to_message(&self, id: String, value: Value) -> Result<Option<BusterReasoningMessage>> {
if let Some(files) = value.get("files").and_then(Value::as_array) { if let Some(files) = value.get("files").and_then(Value::as_array) {
if let Some(last_file) = files.last().and_then(Value::as_object) { if let Some(last_file) = files.last().and_then(Value::as_object) {
let has_name = last_file.get("name").and_then(Value::as_str).is_some();
let has_file_type = last_file.get("file_type").and_then(Value::as_str).is_some();
let has_yml_content = last_file.get("yml_content").is_some();
if has_name && has_file_type && has_yml_content {
let name = last_file.get("name").and_then(Value::as_str).unwrap_or(""); let name = last_file.get("name").and_then(Value::as_str).unwrap_or("");
let file_type = last_file let file_type = last_file.get("file_type").and_then(Value::as_str).unwrap_or("");
.get("file_type") let yml_content = last_file.get("yml_content").and_then(Value::as_str).unwrap_or("");
.and_then(Value::as_str)
.unwrap_or("");
let yml_content = last_file
.get("yml_content")
.and_then(Value::as_str)
.unwrap_or("");
let mut current_lines = Vec::new(); let mut current_lines = Vec::new();
for (i, line) in yml_content.lines().enumerate() { for (i, line) in yml_content.lines().enumerate() {
@ -170,7 +224,7 @@ impl StreamingParser {
}); });
} }
return Ok(Some(BusterChatContainer::File(BusterReasoningFile { return Ok(Some(BusterReasoningMessage::File(BusterReasoningFile {
id, id,
message_type: "file".to_string(), message_type: "file".to_string(),
file_type: file_type.to_string(), file_type: file_type.to_string(),
@ -184,6 +238,7 @@ impl StreamingParser {
}))); })));
} }
} }
}
Ok(None) Ok(None)
} }
} }

View File

@ -100,6 +100,12 @@ pub enum MessageProgress {
Complete, Complete,
} }
impl Default for MessageProgress {
fn default() -> Self {
Self::Complete
}
}
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "role")] #[serde(tag = "role")]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
@ -129,7 +135,7 @@ pub enum AgentMessage {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
tool_calls: Option<Vec<ToolCall>>, tool_calls: Option<Vec<ToolCall>>,
#[serde(skip)] #[serde(skip)]
progress: Option<MessageProgress>, progress: MessageProgress,
#[serde(skip)] #[serde(skip)]
initial: bool, initial: bool,
}, },
@ -141,7 +147,7 @@ pub enum AgentMessage {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>, name: Option<String>,
#[serde(skip)] #[serde(skip)]
progress: Option<MessageProgress>, progress: MessageProgress,
}, },
} }
@ -168,7 +174,7 @@ impl AgentMessage {
id: Option<String>, id: Option<String>,
content: Option<String>, content: Option<String>,
tool_calls: Option<Vec<ToolCall>>, tool_calls: Option<Vec<ToolCall>>,
progress: Option<MessageProgress>, progress: MessageProgress,
initial: Option<bool>, initial: Option<bool>,
name: Option<String>, name: Option<String>,
) -> Self { ) -> Self {
@ -189,7 +195,7 @@ impl AgentMessage {
content: impl Into<String>, content: impl Into<String>,
tool_call_id: impl Into<String>, tool_call_id: impl Into<String>,
name: Option<String>, name: Option<String>,
progress: Option<MessageProgress>, progress: MessageProgress,
) -> Self { ) -> Self {
Self::Tool { Self::Tool {
id, id,
@ -501,7 +507,7 @@ mod tests {
Some("\n\nHello there, how may I assist you today?".to_string()), Some("\n\nHello there, how may I assist you today?".to_string()),
None, None,
None, None,
None, MessageProgress::Complete,
None, None,
None, None,
), ),
@ -645,7 +651,14 @@ mod tests {
choices: vec![Choice { choices: vec![Choice {
finish_reason: Some("length".to_string()), finish_reason: Some("length".to_string()),
index: 0, index: 0,
message: AgentMessage::assistant(Some("".to_string()), None, None, None, None, None), message: AgentMessage::assistant(
Some("".to_string()),
None,
None,
MessageProgress::Complete,
None,
None,
),
delta: None, delta: None,
logprobs: None, logprobs: None,
}], }],
@ -934,7 +947,7 @@ mod tests {
code_interpreter: None, code_interpreter: None,
retrieval: None, retrieval: None,
}]), }]),
None, MessageProgress::Complete,
None, None,
None, None,
), ),