2025-02-14 04:31:53 +08:00
|
|
|
---
|
|
|
|
description: This rule is helpful for understanding how to build our websocket functions. Structure, common patterns, where to look for types, etc.
|
2025-03-04 06:47:14 +08:00
|
|
|
globs: src/routes/ws/**/*.rs
|
|
|
|
alwaysApply: false
|
2025-02-14 04:31:53 +08:00
|
|
|
---
|
|
|
|
|
|
|
|
# WebSocket API Formatting Rules
|
|
|
|
|
|
|
|
- Commonly used types and functions [ws_utils.rs](mdc:src/routes/ws/ws_utils.rs), [ws_router.rs](mdc:src/routes/ws/ws_router.rs)
|
|
|
|
|
|
|
|
## Directory Structure
|
|
|
|
- All WebSocket routes should be located under `src/routes/ws/`
|
|
|
|
- Each resource should have its own directory (e.g., `sql`, `datasets`, `threads_and_messages`)
|
|
|
|
- Resource directories should contain individual files for each operation
|
|
|
|
- Each resource directory should have a `mod.rs` that exports the routes and types
|
|
|
|
|
|
|
|
Example folder structure:
|
|
|
|
```
|
|
|
|
src/routes/ws/
|
|
|
|
├── sql/
|
|
|
|
│ ├── mod.rs
|
|
|
|
│ ├── sql_router.rs # Contains SqlRoute enum and router function
|
|
|
|
│ └── run_sql.rs # Contains handler implementation
|
|
|
|
├── datasets/
|
|
|
|
│ ├── mod.rs
|
|
|
|
│ ├── datasets_router.rs # Contains DatasetRoute enum and router function
|
|
|
|
│ └── list_datasets.rs # Contains handler implementation
|
|
|
|
└── threads_and_messages/
|
|
|
|
├── mod.rs
|
|
|
|
├── threads_router.rs
|
|
|
|
└── post_thread/
|
|
|
|
├── mod.rs
|
|
|
|
├── post_thread.rs
|
|
|
|
└── agent_thread.rs
|
|
|
|
```
|
|
|
|
|
|
|
|
## WebSocket Message Handling with Redis
|
|
|
|
- WebSocket messages are handled through Redis streams for reliable message delivery
|
|
|
|
- Key utilities in [ws_utils.rs](mdc:src/routes/ws/ws_utils.rs) handle message sending, subscriptions, and error handling
|
|
|
|
- Messages are compressed using GZip before being sent through Redis
|
|
|
|
|
|
|
|
### Message Sending Pattern
|
|
|
|
1. Messages are sent using `send_ws_message`:
|
|
|
|
- Serializes the `WsResponseMessage` to JSON
|
|
|
|
- Compresses the message using GZip
|
|
|
|
- Adds the message to a Redis stream with a maxlen of 50
|
|
|
|
```rust
|
|
|
|
pub async fn send_ws_message(subscription: &String, message: &WsResponseMessage) -> Result<()> {
|
|
|
|
// Serialize and compress message
|
|
|
|
let message_string = serde_json::to_string(&message)?;
|
|
|
|
let mut compressed = Vec::new();
|
|
|
|
let mut encoder = GzipEncoder::new(&mut compressed);
|
|
|
|
encoder.write_all(message_string.as_bytes()).await?;
|
|
|
|
|
|
|
|
// Send to Redis stream
|
|
|
|
redis_conn.xadd_maxlen(
|
|
|
|
&subscription,
|
|
|
|
StreamMaxlen::Approx(50),
|
|
|
|
"*",
|
|
|
|
&[("data", &compressed)]
|
|
|
|
).await?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
### Subscription Management
|
|
|
|
1. Subscribe to streams using `subscribe_to_stream`:
|
|
|
|
- Creates a new Redis stream group if it doesn't exist
|
|
|
|
- Adds the subscription to the tracked subscriptions
|
|
|
|
- Notifies the user's stream of the new subscription
|
|
|
|
```rust
|
|
|
|
subscribe_to_stream(
|
|
|
|
subscriptions: &SubscriptionRwLock,
|
|
|
|
new_subscription: &String,
|
|
|
|
user_group: &String,
|
|
|
|
user_id: &Uuid,
|
|
|
|
)
|
|
|
|
```
|
|
|
|
|
|
|
|
2. Unsubscribe from streams using `unsubscribe_from_stream`:
|
|
|
|
- Removes subscription from tracked subscriptions
|
|
|
|
- Destroys the Redis stream group
|
|
|
|
- Cleans up Redis keys if no groups remain
|
|
|
|
- Handles draft subscriptions cleanup
|
|
|
|
```rust
|
|
|
|
unsubscribe_from_stream(
|
|
|
|
subscriptions: &Arc<SubscriptionRwLock>,
|
|
|
|
subscription: &String,
|
|
|
|
user_group: &String,
|
|
|
|
user_id: &Uuid,
|
|
|
|
)
|
|
|
|
```
|
|
|
|
|
|
|
|
### Key-Value Operations
|
|
|
|
- Temporary data can be stored using Redis key-value operations:
|
|
|
|
- `set_key_value`: Sets a key with 1-hour expiration
|
|
|
|
- `get_key_value`: Retrieves a stored value
|
|
|
|
- `delete_key_value`: Removes a stored value
|
|
|
|
|
|
|
|
### Error Message Pattern
|
|
|
|
- Use `send_error_message` for consistent error handling:
|
|
|
|
```rust
|
|
|
|
send_error_message(
|
|
|
|
subscription: &String,
|
|
|
|
route: WsRoutes,
|
|
|
|
event: WsEvent,
|
|
|
|
code: WsErrorCode,
|
|
|
|
message: String,
|
2025-03-07 07:21:26 +08:00
|
|
|
user: &AuthenticatedUser,
|
2025-02-14 04:31:53 +08:00
|
|
|
)
|
|
|
|
```
|
|
|
|
|
|
|
|
## Route Handler Pattern
|
|
|
|
Each WebSocket endpoint should follow a two-function pattern:
|
|
|
|
|
|
|
|
1. Main route handler (e.g., `run_sql`) that:
|
|
|
|
- Takes a request payload and user information
|
|
|
|
- Calls the business logic handler
|
|
|
|
- Handles sending WebSocket messages using `send_ws_message`
|
|
|
|
- Returns `Result<()>` since WebSocket responses are sent asynchronously
|
|
|
|
|
|
|
|
2. Business logic handler (e.g., `run_sql_handler`) that:
|
|
|
|
- Contains pure business logic
|
|
|
|
- Returns `Result<T>` where T is your data type
|
|
|
|
- Can be reused across different routes (REST/WebSocket)
|
|
|
|
- Handles database operations and core functionality
|
|
|
|
|
|
|
|
## Route Enums and Router Configuration
|
|
|
|
- Each resource should have a router file (e.g., `sql_router.rs`) that defines:
|
|
|
|
1. A Route enum for path matching (e.g., `SqlRoute`)
|
|
|
|
2. An Event enum for event types (e.g., `SqlEvent`)
|
|
|
|
3. A router function that matches routes to handlers
|
|
|
|
|
|
|
|
Example Route Enum:
|
|
|
|
```rust
|
|
|
|
#[derive(Deserialize, Serialize, Debug, Clone)]
|
|
|
|
pub enum SqlRoute {
|
|
|
|
#[serde(rename = "/sql/run")]
|
|
|
|
Run,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Deserialize, Serialize, Debug, Clone)]
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
pub enum SqlEvent {
|
|
|
|
RunSql,
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
Example Router Function:
|
|
|
|
```rust
|
2025-03-07 07:21:26 +08:00
|
|
|
pub async fn sql_router(route: SqlRoute, data: Value, user: &AuthenticatedUser) -> Result<()> {
|
2025-02-14 04:31:53 +08:00
|
|
|
match route {
|
|
|
|
SqlRoute::Run => {
|
|
|
|
let req = serde_json::from_value(data)?;
|
|
|
|
run_sql(user, req).await?;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
## WebSocket Response Pattern
|
|
|
|
- All WebSocket responses use the `WsResponseMessage` type
|
|
|
|
- Messages are sent using the `send_ws_message` function
|
|
|
|
- Responses include:
|
|
|
|
- The route that triggered the response
|
|
|
|
- The event type
|
|
|
|
- The response payload
|
|
|
|
- Optional metadata
|
|
|
|
- User information
|
|
|
|
- Send method (e.g., SenderOnly, Broadcast)
|
|
|
|
|
|
|
|
Example Response:
|
|
|
|
```rust
|
|
|
|
let response = WsResponseMessage::new(
|
|
|
|
WsRoutes::Sql(SqlRoute::Run),
|
|
|
|
WsEvent::Sql(SqlEvent::RunSql),
|
|
|
|
response_data,
|
|
|
|
None,
|
|
|
|
user,
|
|
|
|
WsSendMethod::SenderOnly,
|
|
|
|
);
|
|
|
|
|
|
|
|
send_ws_message(&user.id.to_string(), &response).await?;
|
|
|
|
```
|
|
|
|
|
|
|
|
## Error Handling
|
|
|
|
- Business logic handlers should return `Result<T>`
|
|
|
|
- WebSocket handlers should convert errors to appropriate error messages
|
|
|
|
- Use `send_error_message` for consistent error formatting
|
|
|
|
- Include appropriate error logging using `tracing`
|
|
|
|
|
|
|
|
Example Error Handling:
|
|
|
|
```rust
|
|
|
|
if let Err(e) = result {
|
|
|
|
tracing::error!("Error: {}", e);
|
|
|
|
send_error_message(
|
|
|
|
&user.id.to_string(),
|
|
|
|
WsRoutes::Sql(SqlRoute::Run),
|
|
|
|
WsEvent::Sql(SqlEvent::RunSql),
|
|
|
|
WsErrorCode::InternalServerError,
|
|
|
|
e.to_string(),
|
|
|
|
user,
|
|
|
|
).await?;
|
|
|
|
return Err(anyhow!(e));
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
## Main WebSocket Router
|
|
|
|
- The main router (`ws_router.rs`) contains the `WsRoutes` enum
|
|
|
|
- Each resource route enum is a variant in `WsRoutes`
|
|
|
|
- The router parses the incoming path and routes to the appropriate handler
|
|
|
|
- Follows pattern:
|
|
|
|
1. Parse route from path string
|
|
|
|
2. Match on route type
|
|
|
|
3. Call appropriate resource router
|
|
|
|
|
|
|
|
Example:
|
|
|
|
```rust
|
|
|
|
pub enum WsRoutes {
|
|
|
|
Sql(SqlRoute),
|
|
|
|
Datasets(DatasetRoute),
|
|
|
|
Threads(ThreadRoute),
|
|
|
|
// ... other routes
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn ws_router(
|
|
|
|
route: String,
|
|
|
|
payload: Value,
|
|
|
|
subscriptions: &Arc<SubscriptionRwLock>,
|
|
|
|
user_group: &String,
|
2025-03-07 07:21:26 +08:00
|
|
|
user: &AuthenticatedUser,
|
2025-02-14 04:31:53 +08:00
|
|
|
) -> Result<()> {
|
|
|
|
let parsed_route = WsRoutes::from_str(&route)?;
|
|
|
|
match parsed_route {
|
|
|
|
WsRoutes::Sql(sql_route) => {
|
|
|
|
sql_router(sql_route, payload, user).await
|
|
|
|
},
|
|
|
|
// ... handle other routes
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
## Example Implementation
|
|
|
|
See @src/routes/ws/sql/run_sql.rs for a reference implementation that demonstrates:
|
|
|
|
- Separation of WebSocket and business logic
|
|
|
|
- Error handling pattern
|
|
|
|
- Type usage and database operations
|
|
|
|
- WebSocket message sending pattern
|
|
|
|
- Clean abstraction of business logic for potential reuse
|