mirror of https://github.com/buster-so/buster.git
We did it joe
This commit is contained in:
parent
43e2cf44f4
commit
d75931dcb0
|
@ -1,348 +1,186 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::Result;
|
||||
use regex;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::utils::clients::ai::litellm::{Message, MessageProgress, ToolCall};
|
||||
|
||||
use crate::utils::tools::file_tools::create_files::CreateFilesParams;
|
||||
use crate::utils::tools::file_tools::file_types::file::FileEnum;
|
||||
use crate::utils::tools::file_tools::modify_files::ModifyFilesParams;
|
||||
use crate::utils::tools::file_tools::open_files::OpenFilesOutput;
|
||||
use crate::utils::tools::file_tools::search_data_catalog::SearchDataCatalogOutput;
|
||||
use crate::utils::tools::file_tools::search_files::SearchFilesOutput;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct StreamingFileState {
|
||||
id: String,
|
||||
file_type: String,
|
||||
file_name: String,
|
||||
version_id: String,
|
||||
current_lines: Vec<FileContent>,
|
||||
line_buffer: String,
|
||||
next_line_number: usize,
|
||||
has_metadata: bool,
|
||||
status: String,
|
||||
}
|
||||
|
||||
enum ParsingState {
|
||||
WaitingForMetadata,
|
||||
StreamingFiles {
|
||||
files: Vec<StreamingFileState>,
|
||||
},
|
||||
Complete,
|
||||
}
|
||||
|
||||
struct StreamingParser {
|
||||
state: ParsingState,
|
||||
buffer: String,
|
||||
yml_content_regex: regex::Regex,
|
||||
}
|
||||
|
||||
impl StreamingParser {
|
||||
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
state: ParsingState::WaitingForMetadata,
|
||||
pub fn new() -> Self {
|
||||
StreamingParser {
|
||||
buffer: String::new(),
|
||||
yml_content_regex: regex::Regex::new(
|
||||
r#""yml_content":\s*"((?:[^"\\]|\\.|[\r\n])*?)(?:"|$)"#,
|
||||
)
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn complete_json(&self, partial: &str) -> String {
|
||||
let mut json = partial.to_string();
|
||||
let mut result = String::with_capacity(json.len() * 2);
|
||||
|
||||
let mut brace_count = 0;
|
||||
let mut bracket_count = 0;
|
||||
pub fn process_chunk(&mut self, chunk: &str) -> Result<Option<BusterThreadMessage>> {
|
||||
// Add new chunk to buffer
|
||||
self.buffer.push_str(chunk);
|
||||
|
||||
// Extract and replace yml_content with placeholders
|
||||
let mut yml_contents = Vec::new();
|
||||
let mut positions = Vec::new();
|
||||
let mut processed_json = self.buffer.clone();
|
||||
|
||||
// Find all yml_content matches and store them with their positions
|
||||
for captures in self.yml_content_regex.captures_iter(&self.buffer) {
|
||||
if let Some(content_match) = captures.get(1) {
|
||||
yml_contents.push(content_match.as_str().to_string());
|
||||
positions.push((captures.get(0).unwrap().start(), captures.get(0).unwrap().end()));
|
||||
}
|
||||
}
|
||||
|
||||
// Sort positions from last to first to maintain correct indices when replacing
|
||||
let mut position_indices: Vec<usize> = (0..positions.len()).collect();
|
||||
position_indices.sort_by_key(|&i| std::cmp::Reverse(positions[i].0));
|
||||
|
||||
// Replace matches with placeholders in reverse order
|
||||
for i in position_indices {
|
||||
let (start, end) = positions[i];
|
||||
let placeholder = format!(r#""yml_content":"YML_CONTENT_{i}""#);
|
||||
processed_json.replace_range(start..end, &placeholder);
|
||||
}
|
||||
|
||||
// Complete any incomplete JSON structure
|
||||
processed_json = self.complete_json_structure(processed_json);
|
||||
|
||||
// Try to parse the completed JSON
|
||||
if let Ok(mut value) = serde_json::from_str::<Value>(&processed_json) {
|
||||
// Put back the yml_content and process escapes first
|
||||
if let Some(obj) = value.as_object_mut() {
|
||||
if let Some(files) = obj.get_mut("files").and_then(|v| v.as_array_mut()) {
|
||||
for (i, file) in files.iter_mut().enumerate() {
|
||||
if let Some(file_obj) = file.as_object_mut() {
|
||||
if let Some(yml_content) = yml_contents.get(i) {
|
||||
// Process escaped characters
|
||||
let processed_content = serde_json::from_str::<String>(&format!("\"{}\"", yml_content))
|
||||
.unwrap_or_else(|_| yml_content.clone());
|
||||
|
||||
file_obj.insert(
|
||||
"yml_content".to_string(),
|
||||
Value::String(processed_content),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now check the structure after modifications
|
||||
if let Some(obj) = value.as_object() {
|
||||
if let Some(files) = obj.get("files").and_then(Value::as_array) {
|
||||
if let Some(last_file) = files.last().and_then(Value::as_object) {
|
||||
let has_name = last_file.get("name").and_then(Value::as_str).is_some();
|
||||
let has_file_type = last_file.get("file_type").and_then(Value::as_str).is_some();
|
||||
let has_yml_content = last_file.get("yml_content").is_some();
|
||||
|
||||
if has_name && has_file_type && has_yml_content {
|
||||
return self.convert_to_message(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn complete_json_structure(&self, json: String) -> String {
|
||||
let mut processed = String::with_capacity(json.len());
|
||||
let mut nesting_stack = Vec::new();
|
||||
let mut in_string = false;
|
||||
let mut escape_next = false;
|
||||
let mut in_yml_content = false;
|
||||
let mut yml_content_start = 0;
|
||||
|
||||
// First pass: track state and identify yml_content
|
||||
let chars: Vec<char> = json.chars().collect();
|
||||
let mut i = 0;
|
||||
while i < chars.len() {
|
||||
let c = chars[i];
|
||||
|
||||
// Process each character and track structure
|
||||
for c in json.chars() {
|
||||
processed.push(c);
|
||||
|
||||
if escape_next {
|
||||
result.push(c);
|
||||
escape_next = false;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
match c {
|
||||
'{' if !in_string => {
|
||||
brace_count += 1;
|
||||
result.push(c);
|
||||
}
|
||||
'\\' => escape_next = true,
|
||||
'"' if !escape_next => in_string = !in_string,
|
||||
'{' | '[' if !in_string => nesting_stack.push(c),
|
||||
'}' if !in_string => {
|
||||
brace_count -= 1;
|
||||
result.push(c);
|
||||
}
|
||||
'[' if !in_string => {
|
||||
bracket_count += 1;
|
||||
result.push(c);
|
||||
}
|
||||
if nesting_stack.last() == Some(&'{') {
|
||||
nesting_stack.pop();
|
||||
}
|
||||
},
|
||||
']' if !in_string => {
|
||||
bracket_count -= 1;
|
||||
result.push(c);
|
||||
}
|
||||
'"' => {
|
||||
if !escape_next {
|
||||
// Check if we're entering yml_content
|
||||
if !in_string && i >= 13 {
|
||||
let prev = &chars[i-13..i];
|
||||
let prev_str: String = prev.iter().collect();
|
||||
if prev_str == "\"yml_content\":" {
|
||||
in_yml_content = true;
|
||||
yml_content_start = result.len() + 1;
|
||||
}
|
||||
}
|
||||
// Check if we're exiting yml_content
|
||||
if in_string && in_yml_content {
|
||||
// Look ahead to see if this is really the end
|
||||
if i + 1 < chars.len() {
|
||||
match chars[i + 1] {
|
||||
',' | '}' => in_yml_content = false,
|
||||
_ => {} // Not the end, keep going
|
||||
}
|
||||
}
|
||||
}
|
||||
in_string = !in_string;
|
||||
if nesting_stack.last() == Some(&'[') {
|
||||
nesting_stack.pop();
|
||||
}
|
||||
result.push(c);
|
||||
}
|
||||
'\\' => {
|
||||
escape_next = true;
|
||||
result.push(c);
|
||||
}
|
||||
_ => {
|
||||
result.push(c);
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
|
||||
// Second pass: complete any unclosed structures
|
||||
|
||||
// Close any unclosed strings
|
||||
if in_string {
|
||||
// If we're in yml_content, we need to be careful about how we close it
|
||||
if in_yml_content {
|
||||
// Only add closing quote if we don't have an odd number of unescaped quotes
|
||||
let yml_part = &result[yml_content_start..];
|
||||
let mut quote_count = 0;
|
||||
let mut was_escape = false;
|
||||
for c in yml_part.chars() {
|
||||
match c {
|
||||
'"' if !was_escape => quote_count += 1,
|
||||
'\\' => was_escape = !was_escape,
|
||||
_ => was_escape = false
|
||||
}
|
||||
}
|
||||
if quote_count % 2 == 0 {
|
||||
result.push('"');
|
||||
}
|
||||
} else {
|
||||
result.push('"');
|
||||
processed.push('"');
|
||||
}
|
||||
|
||||
// Close structures in reverse order of opening
|
||||
while let Some(c) = nesting_stack.pop() {
|
||||
match c {
|
||||
'{' => processed.push('}'),
|
||||
'[' => processed.push(']'),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Close any unclosed arrays/objects
|
||||
while bracket_count > 0 {
|
||||
result.push(']');
|
||||
bracket_count -= 1;
|
||||
}
|
||||
while brace_count > 0 {
|
||||
result.push('}');
|
||||
brace_count -= 1;
|
||||
}
|
||||
|
||||
result
|
||||
|
||||
println!("complete_json_structure: {:?}", processed);
|
||||
processed
|
||||
}
|
||||
|
||||
fn process_chunk(&mut self, chunk: &str) -> Result<Option<BusterThreadMessage>> {
|
||||
self.buffer.push_str(chunk);
|
||||
let completed_json = self.complete_json(&self.buffer);
|
||||
|
||||
println!("completed_json: {:?}", completed_json);
|
||||
|
||||
match &mut self.state {
|
||||
ParsingState::WaitingForMetadata => {
|
||||
if let Ok(partial) = serde_json::from_str::<Value>(&completed_json) {
|
||||
if let Some(files_array) = partial.get("files").and_then(|f| f.as_array()) {
|
||||
let mut streaming_files = Vec::new();
|
||||
|
||||
for file in files_array {
|
||||
if let (Some(name), Some(file_type)) = (
|
||||
file.get("name").and_then(|n| n.as_str()),
|
||||
file.get("file_type").and_then(|t| t.as_str()),
|
||||
) {
|
||||
let mut file_state = StreamingFileState {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
file_type: file_type.to_string(),
|
||||
file_name: name.to_string(),
|
||||
version_id: Uuid::new_v4().to_string(),
|
||||
current_lines: Vec::new(),
|
||||
line_buffer: String::new(),
|
||||
next_line_number: 1,
|
||||
has_metadata: true,
|
||||
status: "loading".to_string(),
|
||||
};
|
||||
fn convert_to_message(&self, value: Value) -> Result<Option<BusterThreadMessage>> {
|
||||
if let Some(files) = value.get("files").and_then(Value::as_array) {
|
||||
if let Some(last_file) = files.last().and_then(Value::as_object) {
|
||||
let name = last_file.get("name").and_then(Value::as_str).unwrap_or("");
|
||||
let file_type = last_file.get("file_type").and_then(Value::as_str).unwrap_or("");
|
||||
let yml_content = last_file.get("yml_content").and_then(Value::as_str).unwrap_or("");
|
||||
|
||||
// Process any initial content that's available
|
||||
if let Some(yml_content) = file.get("yml_content").and_then(|c| c.as_str()) {
|
||||
if !yml_content.is_empty() {
|
||||
file_state.line_buffer.push_str(yml_content);
|
||||
|
||||
// Process complete lines
|
||||
let mut new_lines = Vec::new();
|
||||
for line in file_state.line_buffer.lines() {
|
||||
new_lines.push(FileContent {
|
||||
text: line.to_string(),
|
||||
line_number: file_state.next_line_number,
|
||||
modified: true,
|
||||
});
|
||||
file_state.next_line_number += 1;
|
||||
}
|
||||
|
||||
// Handle partial lines
|
||||
if !file_state.line_buffer.ends_with('\n') {
|
||||
if let Some(last_newline) = file_state.line_buffer.rfind('\n') {
|
||||
file_state.line_buffer = file_state.line_buffer[last_newline + 1..].to_string();
|
||||
}
|
||||
} else {
|
||||
file_state.line_buffer.clear();
|
||||
}
|
||||
|
||||
file_state.current_lines.extend(new_lines);
|
||||
}
|
||||
}
|
||||
|
||||
streaming_files.push(file_state);
|
||||
|
||||
// Transition to StreamingFiles state as soon as we have metadata
|
||||
if file == files_array.last().unwrap() {
|
||||
let last_file = streaming_files.last().unwrap().clone();
|
||||
self.state = ParsingState::StreamingFiles {
|
||||
files: streaming_files,
|
||||
};
|
||||
|
||||
return Ok(Some(BusterThreadMessage::File(BusterFileMessage {
|
||||
id: last_file.id,
|
||||
message_type: "file".to_string(),
|
||||
file_type: last_file.file_type,
|
||||
file_name: last_file.file_name,
|
||||
version_number: 1,
|
||||
version_id: last_file.version_id,
|
||||
status: "loading".to_string(),
|
||||
file: Some(last_file.current_lines),
|
||||
})));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut current_lines = Vec::new();
|
||||
for (i, line) in yml_content.lines().enumerate() {
|
||||
current_lines.push(BusterFileLine {
|
||||
line_number: i + 1,
|
||||
text: line.to_string(),
|
||||
});
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
ParsingState::StreamingFiles { files } => {
|
||||
if let Ok(partial) = serde_json::from_str::<Value>(&completed_json) {
|
||||
if let Some(files_array) = partial.get("files").and_then(|f| f.as_array()) {
|
||||
// Process content for current file
|
||||
if let Some(current_file) = files.last_mut() {
|
||||
if let Some(file_data) = files_array.last() {
|
||||
if let Some(yml_content) = file_data.get("yml_content").and_then(|c| c.as_str()) {
|
||||
if yml_content.len() > current_file.line_buffer.len() {
|
||||
let new_content = &yml_content[current_file.line_buffer.len()..];
|
||||
current_file.line_buffer.push_str(new_content);
|
||||
|
||||
// Process complete lines
|
||||
let mut new_lines = Vec::new();
|
||||
for line in current_file.line_buffer.lines() {
|
||||
new_lines.push(FileContent {
|
||||
text: line.to_string(),
|
||||
line_number: current_file.next_line_number,
|
||||
modified: true,
|
||||
});
|
||||
current_file.next_line_number += 1;
|
||||
}
|
||||
|
||||
// Handle partial lines
|
||||
if !current_file.line_buffer.ends_with('\n') {
|
||||
if let Some(last_newline) = current_file.line_buffer.rfind('\n') {
|
||||
current_file.line_buffer = current_file.line_buffer[last_newline + 1..].to_string();
|
||||
}
|
||||
} else {
|
||||
current_file.line_buffer.clear();
|
||||
}
|
||||
|
||||
current_file.current_lines.extend(new_lines);
|
||||
|
||||
return Ok(Some(BusterThreadMessage::File(BusterFileMessage {
|
||||
id: current_file.id.clone(),
|
||||
message_type: "file".to_string(),
|
||||
file_type: current_file.file_type.clone(),
|
||||
file_name: current_file.file_name.clone(),
|
||||
version_number: 1,
|
||||
version_id: current_file.version_id.clone(),
|
||||
status: "loading".to_string(),
|
||||
file: Some(current_file.current_lines.clone()),
|
||||
})));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for new files
|
||||
if files_array.len() > files.len() {
|
||||
// Complete the current file if it exists
|
||||
if let Some(current_file) = files.last_mut() {
|
||||
current_file.status = "completed".to_string();
|
||||
|
||||
// Emit completion message for current file
|
||||
let completion_message = BusterThreadMessage::File(BusterFileMessage {
|
||||
id: current_file.id.clone(),
|
||||
message_type: "file".to_string(),
|
||||
file_type: current_file.file_type.clone(),
|
||||
file_name: current_file.file_name.clone(),
|
||||
version_number: 1,
|
||||
version_id: current_file.version_id.clone(),
|
||||
status: "completed".to_string(),
|
||||
file: Some(current_file.current_lines.clone()),
|
||||
});
|
||||
|
||||
// Add new file to state if we have its metadata
|
||||
if let Some(new_file) = files_array.last() {
|
||||
if let (Some(name), Some(file_type)) = (
|
||||
new_file.get("name").and_then(|n| n.as_str()),
|
||||
new_file.get("file_type").and_then(|t| t.as_str()),
|
||||
) {
|
||||
files.push(StreamingFileState {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
file_type: file_type.to_string(),
|
||||
file_name: name.to_string(),
|
||||
version_id: Uuid::new_v4().to_string(),
|
||||
current_lines: Vec::new(),
|
||||
line_buffer: String::new(),
|
||||
next_line_number: 1,
|
||||
has_metadata: true,
|
||||
status: "loading".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(Some(completion_message));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
return Ok(Some(BusterThreadMessage::File(BusterFileMessage {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
message_type: "file".to_string(),
|
||||
file_type: file_type.to_string(),
|
||||
file_name: name.to_string(),
|
||||
version_number: 1,
|
||||
version_id: Uuid::new_v4().to_string(),
|
||||
status: "completed".to_string(),
|
||||
file: Some(current_lines),
|
||||
})));
|
||||
}
|
||||
|
||||
ParsingState::Complete => Ok(None),
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -398,14 +236,13 @@ pub struct BusterFileMessage {
|
|||
pub version_number: i32,
|
||||
pub version_id: String,
|
||||
pub status: String,
|
||||
pub file: Option<Vec<FileContent>>,
|
||||
pub file: Option<Vec<BusterFileLine>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct FileContent {
|
||||
pub text: String,
|
||||
pub struct BusterFileLine {
|
||||
pub line_number: usize,
|
||||
pub modified: bool,
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
pub fn transform_message(message: Message) -> Result<BusterThreadMessage> {
|
||||
|
@ -927,7 +764,7 @@ fn assistant_create_file(
|
|||
) -> Result<BusterThreadMessage> {
|
||||
if let Some(progress) = progress {
|
||||
match progress {
|
||||
MessageProgress::InProgress => {
|
||||
MessageProgress::InProgress | MessageProgress::Complete => {
|
||||
// Try to parse the tool call arguments to get file metadata
|
||||
if let Some(tool_call) = tool_calls.first() {
|
||||
return process_assistant_create_file(tool_call);
|
||||
|
@ -935,7 +772,7 @@ fn assistant_create_file(
|
|||
Err(anyhow::anyhow!("No tool call found"))
|
||||
}
|
||||
_ => Err(anyhow::anyhow!(
|
||||
"Assistant create file only supports in progress."
|
||||
"Assistant create file only supports in progress and complete."
|
||||
)),
|
||||
}
|
||||
} else {
|
||||
|
@ -951,7 +788,7 @@ fn process_assistant_create_file(tool_call: &ToolCall) -> Result<BusterThreadMes
|
|||
return Ok(message);
|
||||
}
|
||||
|
||||
// Return None by returning Ok(None) wrapped in a Result
|
||||
// If we couldn't parse a message, return an error
|
||||
Err(anyhow::anyhow!("Still waiting for file data"))
|
||||
}
|
||||
|
||||
|
@ -1069,37 +906,3 @@ fn tool_modify_file(
|
|||
Err(anyhow::anyhow!("Tool modify file requires progress."))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_message_transformation() {
|
||||
let message = Message::Assistant {
|
||||
id: None,
|
||||
content: Some("Test content".to_string()),
|
||||
name: None,
|
||||
tool_calls: None,
|
||||
progress: None,
|
||||
};
|
||||
|
||||
let result = transform_message(message);
|
||||
assert!(result.is_ok());
|
||||
let transformed = result.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_message_type() {
|
||||
let message = Message::Tool {
|
||||
id: None,
|
||||
content: "content".to_string(),
|
||||
tool_call_id: "test".to_string(),
|
||||
name: None,
|
||||
progress: None,
|
||||
};
|
||||
|
||||
let result = transform_message(message);
|
||||
assert!(matches!(result, Err(_)));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue