mirror of https://github.com/buster-so/buster.git
timeout and file message
This commit is contained in:
parent
8f3fb8732d
commit
8899fb8549
|
@ -34,8 +34,9 @@ use uuid::Uuid;
|
||||||
use crate::chats::{
|
use crate::chats::{
|
||||||
asset_messages::{create_message_file_association, generate_asset_messages},
|
asset_messages::{create_message_file_association, generate_asset_messages},
|
||||||
context_loaders::{
|
context_loaders::{
|
||||||
chat_context::ChatContextLoader, create_asset_context_loader, dashboard_context::DashboardContextLoader,
|
chat_context::ChatContextLoader, create_asset_context_loader,
|
||||||
fetch_asset_details, metric_context::MetricContextLoader, validate_context_request, ContextLoader,
|
dashboard_context::DashboardContextLoader, fetch_asset_details,
|
||||||
|
metric_context::MetricContextLoader, validate_context_request, ContextLoader,
|
||||||
},
|
},
|
||||||
get_chat_handler,
|
get_chat_handler,
|
||||||
streaming_parser::StreamingParser,
|
streaming_parser::StreamingParser,
|
||||||
|
@ -156,12 +157,18 @@ pub async fn post_chat_handler(
|
||||||
// Create a request-local chunk tracker instance instead of using global static
|
// Create a request-local chunk tracker instance instead of using global static
|
||||||
let chunk_tracker = ChunkTracker::new();
|
let chunk_tracker = ChunkTracker::new();
|
||||||
let reasoning_duration = Instant::now();
|
let reasoning_duration = Instant::now();
|
||||||
|
|
||||||
// Normalize request to use asset_id/asset_type if legacy fields are provided
|
// Normalize request to use asset_id/asset_type if legacy fields are provided
|
||||||
let (asset_id, asset_type) = normalize_asset_fields(&request);
|
let (asset_id, asset_type) = normalize_asset_fields(&request);
|
||||||
|
|
||||||
// Validate that only one context type is provided
|
// Validate that only one context type is provided
|
||||||
validate_context_request(request.chat_id, asset_id, asset_type, request.metric_id, request.dashboard_id)?;
|
validate_context_request(
|
||||||
|
request.chat_id,
|
||||||
|
asset_id,
|
||||||
|
asset_type,
|
||||||
|
request.metric_id,
|
||||||
|
request.dashboard_id,
|
||||||
|
)?;
|
||||||
|
|
||||||
let user_org_id = match user.attributes.get("organization_id") {
|
let user_org_id = match user.attributes.get("organization_id") {
|
||||||
Some(Value::String(org_id)) => Uuid::parse_str(org_id).unwrap_or_default(),
|
Some(Value::String(org_id)) => Uuid::parse_str(org_id).unwrap_or_default(),
|
||||||
|
@ -193,18 +200,14 @@ pub async fn post_chat_handler(
|
||||||
if request.prompt.is_none() && asset_id.is_some() && asset_type.is_some() {
|
if request.prompt.is_none() && asset_id.is_some() && asset_type.is_some() {
|
||||||
let asset_id_value = asset_id.unwrap();
|
let asset_id_value = asset_id.unwrap();
|
||||||
let asset_type_value = asset_type.unwrap();
|
let asset_type_value = asset_type.unwrap();
|
||||||
|
|
||||||
let messages = generate_asset_messages(
|
let messages = generate_asset_messages(asset_id_value, asset_type_value, &user).await?;
|
||||||
asset_id_value,
|
|
||||||
asset_type_value,
|
|
||||||
&user,
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
// Add messages to chat and associate with chat_id
|
// Add messages to chat and associate with chat_id
|
||||||
let mut updated_messages = Vec::new();
|
let mut updated_messages = Vec::new();
|
||||||
for mut message in messages {
|
for mut message in messages {
|
||||||
message.chat_id = chat_id;
|
message.chat_id = chat_id;
|
||||||
|
|
||||||
// If this is a file message, create file association
|
// If this is a file message, create file association
|
||||||
if message.response_messages.is_array() {
|
if message.response_messages.is_array() {
|
||||||
let response_arr = message.response_messages.as_array().unwrap();
|
let response_arr = message.response_messages.as_array().unwrap();
|
||||||
|
@ -216,23 +219,24 @@ pub async fn post_chat_handler(
|
||||||
message.id,
|
message.id,
|
||||||
asset_id_value,
|
asset_id_value,
|
||||||
asset_type_value,
|
asset_type_value,
|
||||||
).await;
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert message into database
|
// Insert message into database
|
||||||
let mut conn = get_pg_pool().get().await?;
|
let mut conn = get_pg_pool().get().await?;
|
||||||
insert_into(database::schema::messages::table)
|
insert_into(database::schema::messages::table)
|
||||||
.values(&message)
|
.values(&message)
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Add to updated messages for the response
|
// Add to updated messages for the response
|
||||||
updated_messages.push(message);
|
updated_messages.push(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transform DB messages to ChatMessage format for response
|
// Transform DB messages to ChatMessage format for response
|
||||||
for message in updated_messages {
|
for message in updated_messages {
|
||||||
let chat_message = ChatMessage::new_with_messages(
|
let chat_message = ChatMessage::new_with_messages(
|
||||||
|
@ -249,10 +253,10 @@ pub async fn post_chat_handler(
|
||||||
None,
|
None,
|
||||||
message.created_at,
|
message.created_at,
|
||||||
);
|
);
|
||||||
|
|
||||||
chat_with_messages.add_message(chat_message);
|
chat_with_messages.add_message(chat_message);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return early with auto-generated messages - no need for agent processing
|
// Return early with auto-generated messages - no need for agent processing
|
||||||
return Ok(chat_with_messages);
|
return Ok(chat_with_messages);
|
||||||
}
|
}
|
||||||
|
@ -296,7 +300,9 @@ pub async fn post_chat_handler(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the new user message (now with unwrap_or_default for optional prompt)
|
// Add the new user message (now with unwrap_or_default for optional prompt)
|
||||||
initial_messages.push(AgentMessage::user(request.prompt.clone().unwrap_or_default()));
|
initial_messages.push(AgentMessage::user(
|
||||||
|
request.prompt.clone().unwrap_or_default(),
|
||||||
|
));
|
||||||
|
|
||||||
// Initialize raw_llm_messages with initial_messages
|
// Initialize raw_llm_messages with initial_messages
|
||||||
let mut raw_llm_messages = initial_messages.clone();
|
let mut raw_llm_messages = initial_messages.clone();
|
||||||
|
@ -945,8 +951,8 @@ pub async fn transform_message(
|
||||||
metadata: Some(vec![BusterChatResponseFileMetadata {
|
metadata: Some(vec![BusterChatResponseFileMetadata {
|
||||||
status: "completed".to_string(),
|
status: "completed".to_string(),
|
||||||
message: format!(
|
message: format!(
|
||||||
"File {} completed",
|
"Created new {}",
|
||||||
file_content.file_name
|
file_content.file_type
|
||||||
),
|
),
|
||||||
timestamp: Some(Utc::now().timestamp()),
|
timestamp: Some(Utc::now().timestamp()),
|
||||||
}]),
|
}]),
|
||||||
|
@ -1031,8 +1037,8 @@ pub async fn transform_message(
|
||||||
metadata: Some(vec![BusterChatResponseFileMetadata {
|
metadata: Some(vec![BusterChatResponseFileMetadata {
|
||||||
status: "completed".to_string(),
|
status: "completed".to_string(),
|
||||||
message: format!(
|
message: format!(
|
||||||
"File {} completed",
|
"Created new {}",
|
||||||
file_content.file_name
|
file_content.file_type
|
||||||
),
|
),
|
||||||
timestamp: Some(Utc::now().timestamp()),
|
timestamp: Some(Utc::now().timestamp()),
|
||||||
}]),
|
}]),
|
||||||
|
@ -1836,11 +1842,11 @@ pub struct BusterGeneratingTitle {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper function to normalize legacy and new asset fields
|
/// Helper function to normalize legacy and new asset fields
|
||||||
///
|
///
|
||||||
/// This function converts legacy asset fields (metric_id, dashboard_id) to the new
|
/// This function converts legacy asset fields (metric_id, dashboard_id) to the new
|
||||||
/// generic asset_id/asset_type format. It ensures backward compatibility while
|
/// generic asset_id/asset_type format. It ensures backward compatibility while
|
||||||
/// using a single code path for processing assets.
|
/// using a single code path for processing assets.
|
||||||
///
|
///
|
||||||
/// Returns a tuple of (Option<Uuid>, Option<AssetType>) representing the normalized
|
/// Returns a tuple of (Option<Uuid>, Option<AssetType>) representing the normalized
|
||||||
/// asset reference.
|
/// asset reference.
|
||||||
pub fn normalize_asset_fields(request: &ChatCreateNewChat) -> (Option<Uuid>, Option<AssetType>) {
|
pub fn normalize_asset_fields(request: &ChatCreateNewChat) -> (Option<Uuid>, Option<AssetType>) {
|
||||||
|
@ -1848,16 +1854,16 @@ pub fn normalize_asset_fields(request: &ChatCreateNewChat) -> (Option<Uuid>, Opt
|
||||||
if request.asset_id.is_some() && request.asset_type.is_some() {
|
if request.asset_id.is_some() && request.asset_type.is_some() {
|
||||||
return (request.asset_id, request.asset_type);
|
return (request.asset_id, request.asset_type);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If legacy fields are provided, convert them to the new format
|
// If legacy fields are provided, convert them to the new format
|
||||||
if let Some(metric_id) = request.metric_id {
|
if let Some(metric_id) = request.metric_id {
|
||||||
return (Some(metric_id), Some(AssetType::MetricFile));
|
return (Some(metric_id), Some(AssetType::MetricFile));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(dashboard_id) = request.dashboard_id {
|
if let Some(dashboard_id) = request.dashboard_id {
|
||||||
return (Some(dashboard_id), Some(AssetType::DashboardFile));
|
return (Some(dashboard_id), Some(AssetType::DashboardFile));
|
||||||
}
|
}
|
||||||
|
|
||||||
// No asset references
|
// No asset references
|
||||||
(None, None)
|
(None, None)
|
||||||
}
|
}
|
||||||
|
@ -1978,7 +1984,7 @@ async fn initialize_chat(
|
||||||
user_org_id: Uuid,
|
user_org_id: Uuid,
|
||||||
) -> Result<(Uuid, Uuid, ChatWithMessages)> {
|
) -> Result<(Uuid, Uuid, ChatWithMessages)> {
|
||||||
let message_id = request.message_id.unwrap_or_else(Uuid::new_v4);
|
let message_id = request.message_id.unwrap_or_else(Uuid::new_v4);
|
||||||
|
|
||||||
// Get a default title for chats with optional prompt
|
// Get a default title for chats with optional prompt
|
||||||
let default_title = match request.prompt {
|
let default_title = match request.prompt {
|
||||||
Some(ref prompt) => prompt.clone(),
|
Some(ref prompt) => prompt.clone(),
|
||||||
|
|
|
@ -36,7 +36,7 @@ use super::{
|
||||||
collections::collections_router::CollectionEvent, dashboards::dashboards_router::DashboardEvent, data_sources::data_sources_router::DataSourceEvent, datasets::datasets_router::DatasetEvent, metrics::MetricEvent, organizations::organization_router::OrganizationEvent, permissions::permissions_router::PermissionEvent, search::search_router::SearchEvent, sql::sql_router::SqlEvent, teams::teams_routes::TeamEvent, terms::terms_router::TermEvent, threads_and_messages::threads_router::ThreadEvent, users::users_router::UserEvent, ws_router::{ws_router, WsRoutes}, ws_utils::{subscribe_to_stream, unsubscribe_from_stream}
|
collections::collections_router::CollectionEvent, dashboards::dashboards_router::DashboardEvent, data_sources::data_sources_router::DataSourceEvent, datasets::datasets_router::DatasetEvent, metrics::MetricEvent, organizations::organization_router::OrganizationEvent, permissions::permissions_router::PermissionEvent, search::search_router::SearchEvent, sql::sql_router::SqlEvent, teams::teams_routes::TeamEvent, terms::terms_router::TermEvent, threads_and_messages::threads_router::ThreadEvent, users::users_router::UserEvent, ws_router::{ws_router, WsRoutes}, ws_utils::{subscribe_to_stream, unsubscribe_from_stream}
|
||||||
};
|
};
|
||||||
|
|
||||||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(300);
|
const CLIENT_TIMEOUT: Duration = Duration::from_secs(900);
|
||||||
const PING_INTERVAL: Duration = Duration::from_secs(15);
|
const PING_INTERVAL: Duration = Duration::from_secs(15);
|
||||||
const PING_TIMEOUT: Duration = Duration::from_secs(5);
|
const PING_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue