--- description: This rule is helpful for understanding how to build our websocket functions. Structure, common patterns, where to look for types, etc. globs: src/routes/ws/**/*.rs alwaysApply: false --- # WebSocket API Formatting Rules - Commonly used types and functions [ws_utils.rs](mdc:src/routes/ws/ws_utils.rs), [ws_router.rs](mdc:src/routes/ws/ws_router.rs) ## Directory Structure - All WebSocket routes should be located under `src/routes/ws/` - Each resource should have its own directory (e.g., `sql`, `datasets`, `threads_and_messages`) - Resource directories should contain individual files for each operation - Each resource directory should have a `mod.rs` that exports the routes and types Example folder structure: ``` src/routes/ws/ ├── sql/ │ ├── mod.rs │ ├── sql_router.rs # Contains SqlRoute enum and router function │ └── run_sql.rs # Contains handler implementation ├── datasets/ │ ├── mod.rs │ ├── datasets_router.rs # Contains DatasetRoute enum and router function │ └── list_datasets.rs # Contains handler implementation └── threads_and_messages/ ├── mod.rs ├── threads_router.rs └── post_thread/ ├── mod.rs ├── post_thread.rs └── agent_thread.rs ``` ## WebSocket Message Handling with Redis - WebSocket messages are handled through Redis streams for reliable message delivery - Key utilities in [ws_utils.rs](mdc:src/routes/ws/ws_utils.rs) handle message sending, subscriptions, and error handling - Messages are compressed using GZip before being sent through Redis ### Message Sending Pattern 1. Messages are sent using `send_ws_message`: - Serializes the `WsResponseMessage` to JSON - Compresses the message using GZip - Adds the message to a Redis stream with a maxlen of 50 ```rust pub async fn send_ws_message(subscription: &String, message: &WsResponseMessage) -> Result<()> { // Serialize and compress message let message_string = serde_json::to_string(&message)?; let mut compressed = Vec::new(); let mut encoder = GzipEncoder::new(&mut compressed); encoder.write_all(message_string.as_bytes()).await?; // Send to Redis stream redis_conn.xadd_maxlen( &subscription, StreamMaxlen::Approx(50), "*", &[("data", &compressed)] ).await?; Ok(()) } ``` ### Subscription Management 1. Subscribe to streams using `subscribe_to_stream`: - Creates a new Redis stream group if it doesn't exist - Adds the subscription to the tracked subscriptions - Notifies the user's stream of the new subscription ```rust subscribe_to_stream( subscriptions: &SubscriptionRwLock, new_subscription: &String, user_group: &String, user_id: &Uuid, ) ``` 2. Unsubscribe from streams using `unsubscribe_from_stream`: - Removes subscription from tracked subscriptions - Destroys the Redis stream group - Cleans up Redis keys if no groups remain - Handles draft subscriptions cleanup ```rust unsubscribe_from_stream( subscriptions: &Arc, subscription: &String, user_group: &String, user_id: &Uuid, ) ``` ### Key-Value Operations - Temporary data can be stored using Redis key-value operations: - `set_key_value`: Sets a key with 1-hour expiration - `get_key_value`: Retrieves a stored value - `delete_key_value`: Removes a stored value ### Error Message Pattern - Use `send_error_message` for consistent error handling: ```rust send_error_message( subscription: &String, route: WsRoutes, event: WsEvent, code: WsErrorCode, message: String, user: &AuthenticatedUser, ) ``` ## Route Handler Pattern Each WebSocket endpoint should follow a two-function pattern: 1. Main route handler (e.g., `run_sql`) that: - Takes a request payload and user information - Calls the business logic handler - Handles sending WebSocket messages using `send_ws_message` - Returns `Result<()>` since WebSocket responses are sent asynchronously 2. Business logic handler (e.g., `run_sql_handler`) that: - Contains pure business logic - Returns `Result` where T is your data type - Can be reused across different routes (REST/WebSocket) - Handles database operations and core functionality ## Route Enums and Router Configuration - Each resource should have a router file (e.g., `sql_router.rs`) that defines: 1. A Route enum for path matching (e.g., `SqlRoute`) 2. An Event enum for event types (e.g., `SqlEvent`) 3. A router function that matches routes to handlers Example Route Enum: ```rust #[derive(Deserialize, Serialize, Debug, Clone)] pub enum SqlRoute { #[serde(rename = "/sql/run")] Run, } #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub enum SqlEvent { RunSql, } ``` Example Router Function: ```rust pub async fn sql_router(route: SqlRoute, data: Value, user: &AuthenticatedUser) -> Result<()> { match route { SqlRoute::Run => { let req = serde_json::from_value(data)?; run_sql(user, req).await?; } }; Ok(()) } ``` ## WebSocket Response Pattern - All WebSocket responses use the `WsResponseMessage` type - Messages are sent using the `send_ws_message` function - Responses include: - The route that triggered the response - The event type - The response payload - Optional metadata - User information - Send method (e.g., SenderOnly, Broadcast) Example Response: ```rust let response = WsResponseMessage::new( WsRoutes::Sql(SqlRoute::Run), WsEvent::Sql(SqlEvent::RunSql), response_data, None, user, WsSendMethod::SenderOnly, ); send_ws_message(&user.id.to_string(), &response).await?; ``` ## Error Handling - Business logic handlers should return `Result` - WebSocket handlers should convert errors to appropriate error messages - Use `send_error_message` for consistent error formatting - Include appropriate error logging using `tracing` Example Error Handling: ```rust if let Err(e) = result { tracing::error!("Error: {}", e); send_error_message( &user.id.to_string(), WsRoutes::Sql(SqlRoute::Run), WsEvent::Sql(SqlEvent::RunSql), WsErrorCode::InternalServerError, e.to_string(), user, ).await?; return Err(anyhow!(e)); } ``` ## Main WebSocket Router - The main router (`ws_router.rs`) contains the `WsRoutes` enum - Each resource route enum is a variant in `WsRoutes` - The router parses the incoming path and routes to the appropriate handler - Follows pattern: 1. Parse route from path string 2. Match on route type 3. Call appropriate resource router Example: ```rust pub enum WsRoutes { Sql(SqlRoute), Datasets(DatasetRoute), Threads(ThreadRoute), // ... other routes } pub async fn ws_router( route: String, payload: Value, subscriptions: &Arc, user_group: &String, user: &AuthenticatedUser, ) -> Result<()> { let parsed_route = WsRoutes::from_str(&route)?; match parsed_route { WsRoutes::Sql(sql_route) => { sql_router(sql_route, payload, user).await }, // ... handle other routes } } ``` ## Example Implementation See @src/routes/ws/sql/run_sql.rs for a reference implementation that demonstrates: - Separation of WebSocket and business logic - Error handling pattern - Type usage and database operations - WebSocket message sending pattern - Clean abstraction of business logic for potential reuse