buster/api/documentation/websockets.mdc

251 lines
7.6 KiB
Plaintext

---
description: This rule is helpful for understanding how to build our websocket functions. Structure, common patterns, where to look for types, etc.
globs: src/routes/ws/**/*.rs
alwaysApply: false
---
# WebSocket API Formatting Rules
- Commonly used types and functions [ws_utils.rs](mdc:src/routes/ws/ws_utils.rs), [ws_router.rs](mdc:src/routes/ws/ws_router.rs)
## Directory Structure
- All WebSocket routes should be located under `src/routes/ws/`
- Each resource should have its own directory (e.g., `sql`, `datasets`, `threads_and_messages`)
- Resource directories should contain individual files for each operation
- Each resource directory should have a `mod.rs` that exports the routes and types
Example folder structure:
```
src/routes/ws/
├── sql/
│ ├── mod.rs
│ ├── sql_router.rs # Contains SqlRoute enum and router function
│ └── run_sql.rs # Contains handler implementation
├── datasets/
│ ├── mod.rs
│ ├── datasets_router.rs # Contains DatasetRoute enum and router function
│ └── list_datasets.rs # Contains handler implementation
└── threads_and_messages/
├── mod.rs
├── threads_router.rs
└── post_thread/
├── mod.rs
├── post_thread.rs
└── agent_thread.rs
```
## WebSocket Message Handling with Redis
- WebSocket messages are handled through Redis streams for reliable message delivery
- Key utilities in [ws_utils.rs](mdc:src/routes/ws/ws_utils.rs) handle message sending, subscriptions, and error handling
- Messages are compressed using GZip before being sent through Redis
### Message Sending Pattern
1. Messages are sent using `send_ws_message`:
- Serializes the `WsResponseMessage` to JSON
- Compresses the message using GZip
- Adds the message to a Redis stream with a maxlen of 50
```rust
pub async fn send_ws_message(subscription: &String, message: &WsResponseMessage) -> Result<()> {
// Serialize and compress message
let message_string = serde_json::to_string(&message)?;
let mut compressed = Vec::new();
let mut encoder = GzipEncoder::new(&mut compressed);
encoder.write_all(message_string.as_bytes()).await?;
// Send to Redis stream
redis_conn.xadd_maxlen(
&subscription,
StreamMaxlen::Approx(50),
"*",
&[("data", &compressed)]
).await?;
Ok(())
}
```
### Subscription Management
1. Subscribe to streams using `subscribe_to_stream`:
- Creates a new Redis stream group if it doesn't exist
- Adds the subscription to the tracked subscriptions
- Notifies the user's stream of the new subscription
```rust
subscribe_to_stream(
subscriptions: &SubscriptionRwLock,
new_subscription: &String,
user_group: &String,
user_id: &Uuid,
)
```
2. Unsubscribe from streams using `unsubscribe_from_stream`:
- Removes subscription from tracked subscriptions
- Destroys the Redis stream group
- Cleans up Redis keys if no groups remain
- Handles draft subscriptions cleanup
```rust
unsubscribe_from_stream(
subscriptions: &Arc<SubscriptionRwLock>,
subscription: &String,
user_group: &String,
user_id: &Uuid,
)
```
### Key-Value Operations
- Temporary data can be stored using Redis key-value operations:
- `set_key_value`: Sets a key with 1-hour expiration
- `get_key_value`: Retrieves a stored value
- `delete_key_value`: Removes a stored value
### Error Message Pattern
- Use `send_error_message` for consistent error handling:
```rust
send_error_message(
subscription: &String,
route: WsRoutes,
event: WsEvent,
code: WsErrorCode,
message: String,
user: &AuthenticatedUser,
)
```
## Route Handler Pattern
Each WebSocket endpoint should follow a two-function pattern:
1. Main route handler (e.g., `run_sql`) that:
- Takes a request payload and user information
- Calls the business logic handler
- Handles sending WebSocket messages using `send_ws_message`
- Returns `Result<()>` since WebSocket responses are sent asynchronously
2. Business logic handler (e.g., `run_sql_handler`) that:
- Contains pure business logic
- Returns `Result<T>` where T is your data type
- Can be reused across different routes (REST/WebSocket)
- Handles database operations and core functionality
## Route Enums and Router Configuration
- Each resource should have a router file (e.g., `sql_router.rs`) that defines:
1. A Route enum for path matching (e.g., `SqlRoute`)
2. An Event enum for event types (e.g., `SqlEvent`)
3. A router function that matches routes to handlers
Example Route Enum:
```rust
#[derive(Deserialize, Serialize, Debug, Clone)]
pub enum SqlRoute {
#[serde(rename = "/sql/run")]
Run,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub enum SqlEvent {
RunSql,
}
```
Example Router Function:
```rust
pub async fn sql_router(route: SqlRoute, data: Value, user: &AuthenticatedUser) -> Result<()> {
match route {
SqlRoute::Run => {
let req = serde_json::from_value(data)?;
run_sql(user, req).await?;
}
};
Ok(())
}
```
## WebSocket Response Pattern
- All WebSocket responses use the `WsResponseMessage` type
- Messages are sent using the `send_ws_message` function
- Responses include:
- The route that triggered the response
- The event type
- The response payload
- Optional metadata
- User information
- Send method (e.g., SenderOnly, Broadcast)
Example Response:
```rust
let response = WsResponseMessage::new(
WsRoutes::Sql(SqlRoute::Run),
WsEvent::Sql(SqlEvent::RunSql),
response_data,
None,
user,
WsSendMethod::SenderOnly,
);
send_ws_message(&user.id.to_string(), &response).await?;
```
## Error Handling
- Business logic handlers should return `Result<T>`
- WebSocket handlers should convert errors to appropriate error messages
- Use `send_error_message` for consistent error formatting
- Include appropriate error logging using `tracing`
Example Error Handling:
```rust
if let Err(e) = result {
tracing::error!("Error: {}", e);
send_error_message(
&user.id.to_string(),
WsRoutes::Sql(SqlRoute::Run),
WsEvent::Sql(SqlEvent::RunSql),
WsErrorCode::InternalServerError,
e.to_string(),
user,
).await?;
return Err(anyhow!(e));
}
```
## Main WebSocket Router
- The main router (`ws_router.rs`) contains the `WsRoutes` enum
- Each resource route enum is a variant in `WsRoutes`
- The router parses the incoming path and routes to the appropriate handler
- Follows pattern:
1. Parse route from path string
2. Match on route type
3. Call appropriate resource router
Example:
```rust
pub enum WsRoutes {
Sql(SqlRoute),
Datasets(DatasetRoute),
Threads(ThreadRoute),
// ... other routes
}
pub async fn ws_router(
route: String,
payload: Value,
subscriptions: &Arc<SubscriptionRwLock>,
user_group: &String,
user: &AuthenticatedUser,
) -> Result<()> {
let parsed_route = WsRoutes::from_str(&route)?;
match parsed_route {
WsRoutes::Sql(sql_route) => {
sql_router(sql_route, payload, user).await
},
// ... handle other routes
}
}
```
## Example Implementation
See @src/routes/ws/sql/run_sql.rs for a reference implementation that demonstrates:
- Separation of WebSocket and business logic
- Error handling pattern
- Type usage and database operations
- WebSocket message sending pattern
- Clean abstraction of business logic for potential reuse