11 KiB
title | author | date | status | parent_prd |
---|---|---|---|---|
WebSocket Post Chat Endpoint Implementation | Dallin | 2025-03-21 | Completed | optional_prompt_asset_chat.md |
WebSocket Post Chat Endpoint Implementation
Parent Project
This is a sub-PRD of the Optional Prompt Asset Chat System project. Please refer to the parent PRD for the overall project context, goals, and implementation plan.
Problem Statement
The current WebSocket endpoint for creating chats (post_thread
) expects a prompt as a required field and handles specific asset types through separate parameters (metric_id
and dashboard_id
). To support the parent project's goals, this endpoint needs to be updated to accept an optional prompt when an asset is provided and to use a more generic asset reference model.
This component will update the WebSocket endpoint to:
- Accept requests with optional prompts when an asset is provided
- Handle generic asset references (asset_id and asset_type)
- Maintain backward compatibility where possible
- Continue to properly validate all inputs
- Correctly stream the results, including the auto-generated messages for prompt-less requests
Goals
- Update the WebSocket API endpoint to support optional prompts with asset context
- Modify the endpoint to accept asset_id and asset_type instead of specific asset parameters
- Update validation to ensure valid combinations of parameters
- Maintain backward compatibility with existing clients
- Ensure streaming works correctly for both prompt and prompt-less flows
- Handle error cases consistently
Non-Goals
- Changing the endpoint routing structure
- Modifying the core WebSocket message format
- Adding new authentication/authorization mechanisms
- Implementing client-side changes to adapt to the updated API
Technical Design
Component Overview
The WebSocket endpoint for creating chats is implemented in post_thread
in the file src/routes/ws/threads_and_messages/post_thread.rs
. It:
- Receives requests from clients via WebSocket
- Extracts the authenticated user from the WebSocket context
- Deserializes the payload into a
ChatCreateNewChat
struct - Sets up a channel for streaming results
- Calls the
post_chat_handler
with the request and channel - Streams results back to the client as they're available
- Handles completion and error cases
The updated endpoint will maintain this flow but update the request schema, validation, and ensure proper streaming for prompt-less flows.
graph TD
A[Client] -->|WebSocket| B[post_thread]
B -->|Deserialize| C[ChatCreateNewChat]
C --> D{Validate Request}
D -->|Valid| E[Create Channel]
D -->|Invalid| F[Error Message]
E --> G[post_chat_handler]
G -->|Stream| H[Process Channel]
H -->|Send Messages| I[Client]
F -->|Send Error| I
Interfaces
Exposed Interfaces
// WebSocket handler
pub async fn post_thread(
user: AuthenticatedUser,
request: ChatCreateNewChat,
) -> Result<()> {
// Implementation
}
// Updated request structure (same as handler)
pub struct ChatCreateNewChat {
pub prompt: Option<String>, // Now optional
pub chat_id: Option<Uuid>,
pub message_id: Option<Uuid>,
pub asset_id: Option<Uuid>,
pub asset_type: Option<AssetType>,
}
Consumed Interfaces
// Handler signature (from handler component)
pub async fn post_chat_handler(
request: ChatCreateNewChat,
user: AuthenticatedUser,
tx: Option<mpsc::Sender<Result<(BusterContainer, ThreadEvent)>>>,
) -> Result<ChatWithMessages> {
// Implementation in handler component
}
// WebSocket utilities
pub async fn send_ws_message(
subscription: &String,
message: &WsResponseMessage,
) -> Result<()> {
// Implementation in ws_utils
}
pub async fn send_error_message(
subscription: &String,
route: WsRoutes,
event: WsEvent,
code: WsErrorCode,
message: String,
user: &AuthenticatedUser,
) -> Result<()> {
// Implementation in ws_utils
}
Implementation Details
Updated WebSocket Handler
pub async fn post_thread(
user: AuthenticatedUser,
request: ChatCreateNewChat,
) -> Result<()> {
// Validate parameters
if request.asset_id.is_some() && request.asset_type.is_none() {
return send_error_message(
&user.id.to_string(),
WsRoutes::Thread(ThreadRoute::Post),
WsEvent::Thread(ThreadEvent::PostThread),
WsErrorCode::BadRequest,
"asset_type must be provided when asset_id is specified".to_string(),
&user,
).await;
}
// Create channel for streaming results
let (tx, mut rx) = mpsc::channel(1000);
// Spawn task to process results
tokio::spawn(async move {
while let Some(result) = rx.recv().await {
match result {
Ok((container, event)) => {
// Map container and event to WebSocket response
let response = WsResponseMessage::new(
WsRoutes::Thread(ThreadRoute::Post),
WsEvent::Thread(event),
container,
None,
&user,
WsSendMethod::All,
);
if let Err(e) = send_ws_message(&user.id.to_string(), &response).await {
tracing::error!("Error sending WebSocket message: {}", e);
break;
}
}
Err(e) => {
// Send error to client
if let Err(e) = send_error_message(
&user.id.to_string(),
WsRoutes::Thread(ThreadRoute::Post),
WsEvent::Thread(ThreadEvent::PostThread),
WsErrorCode::InternalServerError,
format!("Error processing thread: {}", e),
&user,
).await {
tracing::error!("Error sending error message: {}", e);
}
break;
}
}
}
});
// Call handler with channel
match post_chat_handler(request, user.clone(), Some(tx)).await {
Ok(chat_with_messages) => {
// For prompt-less flows, the handler might complete without sending messages through the channel
// Send a final completion message if needed
let response = WsResponseMessage::new(
WsRoutes::Thread(ThreadRoute::Post),
WsEvent::Thread(ThreadEvent::Completed),
BusterContainer::Chat(chat_with_messages),
None,
&user,
WsSendMethod::All,
);
send_ws_message(&user.id.to_string(), &response).await?;
Ok(())
}
Err(e) => {
send_error_message(
&user.id.to_string(),
WsRoutes::Thread(ThreadRoute::Post),
WsEvent::Thread(ThreadEvent::PostThread),
WsErrorCode::InternalServerError,
format!("Error creating thread: {}", e),
&user,
).await
}
}
}
File Changes
Modified Files
- ✅
src/routes/ws/threads_and_messages/post_thread.rs
- Changes:
- Updated request validation to support optional prompt
- Added handling for asset_id and asset_type fields
- Implemented proper streaming for prompt-less flows
- Enhanced error handling with detailed error messages
- Added comprehensive documentation
- Purpose: WebSocket API endpoint implementation
- Changes:
Added Files
- ✅
tests/integration/threads_and_messages/post_thread_test.rs
- Changes:
- Created integration tests for the WebSocket endpoint
- Tests include validation, prompt-less flows, legacy support, and error handling
- Purpose: Testing WebSocket API endpoint
- Changes:
Updated Files
- ✅
tests/integration/threads_and_messages/mod.rs
- Changes:
- Added export for new test module
- Purpose: Module organization
- Changes:
Testing Strategy ✅
Unit Tests ✅
-
✅ Test request validation
- Input: Various combinations of prompt, chat_id, asset_id, and asset_type
- Expected output: Success or error result
- Edge cases:
- Asset_id without asset_type
- Invalid asset_type values
- No prompt but also no asset
-
✅ Test event mapping
- Input: Different ThreadEvent types
- Expected output: Correct WsEvent mapping
- Edge cases:
- New event types
- Error events
Integration Tests ✅
-
✅ Test scenario: Create chat with asset but no prompt
- Components involved: post_thread, post_chat_handler, websocket
- Test steps:
- Create request with asset_id, asset_type, but no prompt
- Call post_thread
- Verify correct messages are streamed to client
- Expected outcome: Chat created with file and text messages, properly streamed
-
✅ Test scenario: Create chat with asset and prompt
- Components involved: post_thread, post_chat_handler, websocket
- Test steps:
- Create request with asset_id, asset_type, and prompt
- Call post_thread
- Verify all agent messages are streamed correctly
- Expected outcome: Normal streaming flow with all messages
-
✅ Test scenario: Error handling
- Components involved: post_thread, error handling
- Test steps:
- Create invalid request (e.g., asset_id without asset_type)
- Call post_thread
- Verify proper error response is sent
- Expected outcome: Error message sent through WebSocket
Security Considerations ✅
- ✅ Validate asset_type to prevent injection attacks
- ✅ Maintain user authentication and authorization checks
- ✅ Ensure proper error messages that don't leak sensitive information
- ✅ Apply rate limiting to prevent abuse
- ✅ Handle dropped connections gracefully to prevent resource leaks
Dependencies on Other Components ✅
Required Components ✅
- ✅ Updated Chat Handler: Requires the handler to support optional prompts and generic assets
- ✅ WebSocket Utils: Requires utilities for sending messages and errors
- ✅ Asset Type Definitions: Requires valid asset types to be defined
Concurrent Development ✅
- ✅ REST endpoint: Can be updated concurrently
- Potential conflicts: Request structure and validation logic
- Mitigation strategy: Use shared validation functions where possible
Implementation Timeline
- ✅ Update request handling: 0.5 days
- ✅ Update validation: 0.5 days
- ✅ Implement streaming for prompt-less flows: 1 day
- ✅ Testing: 1 day
Total estimated time: 3 days (Completed)