mirror of https://github.com/buster-so/buster.git
added in some testing work and changed some rules
This commit is contained in:
parent
51b6d00159
commit
743c256dbc
|
@ -38,13 +38,6 @@ let mut conn = get_pg_pool().get().await?;
|
|||
### Database Operations
|
||||
- Use Diesel for database migrations and query building
|
||||
- Migrations are stored in the `migrations/` directory
|
||||
- Always use transactions for operations that modify multiple tables:
|
||||
```rust
|
||||
conn.transaction(|conn| {
|
||||
// Your database operations here
|
||||
Ok(())
|
||||
})
|
||||
```
|
||||
|
||||
### Concurrency Guidelines
|
||||
- Prioritize concurrent operations, especially for:
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
---
|
||||
description: This is helpful for building and designing prds for our application and how to write them
|
||||
description: This is helpful for building and designing prds for our application and how to write them. Refer
|
||||
globs: prds/*
|
||||
---
|
||||
# PRD (Product Requirements Document) Guidelines
|
||||
|
||||
## Overview
|
||||
This document provides guidelines for creating and managing Product Requirements Documents (PRDs) in our codebase. All PRDs should follow the standardized template located at @template.md.
|
||||
This document provides guidelines for creating and managing Product Requirements Documents (PRDs) in our codebase. All PRDs should follow the standardized template located at [template.md](mdc:prds/template.md)
|
||||
|
||||
## PRD Structure
|
||||
|
||||
|
@ -66,7 +66,6 @@ The template [template.md](mdc:prds/template.md) provides comprehensive sections
|
|||
4. **Testing Strategy**
|
||||
- Unit test requirements
|
||||
- Integration test scenarios
|
||||
- Performance test criteria
|
||||
|
||||
## Best Practices
|
||||
|
||||
|
|
|
@ -27,6 +27,52 @@ globs: src/*
|
|||
- Use real dependencies when possible, mock only what's necessary
|
||||
- Include end-to-end workflow tests
|
||||
|
||||
### Integration Test Setup Requirements
|
||||
- All integration tests must import and utilize the application's schema from [schema.rs](mdc:src/database/schema.rs)
|
||||
- Database models from [models.rs](mdc:src/database/models.rs) should be used for test data setup and verification
|
||||
- Environment setup must use `dotenv` for configuration:
|
||||
```rust
|
||||
use dotenv::dotenv;
|
||||
|
||||
#[tokio::test]
|
||||
async fn setup_test_environment() {
|
||||
dotenv().ok(); // Load environment variables
|
||||
// Test environment setup
|
||||
}
|
||||
```
|
||||
- Service configurations should be derived from environment variables:
|
||||
```rust
|
||||
// Example of service configuration using env vars
|
||||
let database_url = std::env::var("DATABASE_URL")
|
||||
.expect("DATABASE_URL must be set for integration tests");
|
||||
let test_api_key = std::env::var("TEST_API_KEY")
|
||||
.expect("TEST_API_KEY must be set for integration tests");
|
||||
```
|
||||
- Test database setup should include:
|
||||
```rust
|
||||
use crate::database::{schema, models};
|
||||
|
||||
async fn setup_test_db() -> PgPool {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(&std::env::var("TEST_DATABASE_URL")?)
|
||||
.await?;
|
||||
|
||||
// Run migrations or setup test data
|
||||
// Use schema and models for consistency
|
||||
Ok(pool)
|
||||
}
|
||||
```
|
||||
|
||||
### Required Environment Variables
|
||||
Create a `.env.test` file with necessary test configurations:
|
||||
```env
|
||||
TEST_DATABASE_URL=postgres://user:pass@localhost/test_db
|
||||
TEST_API_KEY=test-key
|
||||
TEST_ENV=test
|
||||
# Add other required test environment variables
|
||||
```
|
||||
|
||||
## Test Structure
|
||||
```rust
|
||||
#[cfg(test)]
|
||||
|
@ -119,4 +165,168 @@ mod tests {
|
|||
mock.assert();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Example Integration Test
|
||||
```rust
|
||||
use crate::database::{models, schema};
|
||||
use dotenv::dotenv;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_user_creation_flow() {
|
||||
// Load test environment
|
||||
dotenv().ok();
|
||||
|
||||
// Setup test database connection
|
||||
let pool = setup_test_db().await.expect("Failed to setup test database");
|
||||
|
||||
// Create test user using models
|
||||
let test_user = models::User {
|
||||
id: Uuid::new_v4(),
|
||||
email: "test@example.com".to_string(),
|
||||
name: Some("Test User".to_string()),
|
||||
config: serde_json::Value::Null,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
attributes: serde_json::Value::Null,
|
||||
};
|
||||
|
||||
// Use schema for database operations
|
||||
diesel::insert_into(schema::users::table)
|
||||
.values(&test_user)
|
||||
.execute(&mut pool.get().await?)
|
||||
.expect("Failed to insert test user");
|
||||
|
||||
// Test application logic
|
||||
let response = create_test_client()
|
||||
.get("/api/users")
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
assert_eq!(response.status(), 200);
|
||||
// Additional assertions...
|
||||
}
|
||||
```
|
||||
|
||||
## Common Test Utilities
|
||||
- All shared test utilities should be placed in `tests/common/mod.rs`
|
||||
- Common database setup and teardown functions should be in `tests/common/db.rs`
|
||||
- Environment setup utilities should be in `tests/common/env.rs`
|
||||
- Shared test fixtures should be in `tests/common/fixtures/`
|
||||
|
||||
### Common Test Module Structure
|
||||
```
|
||||
tests/
|
||||
├── common/
|
||||
│ ├── mod.rs # Main module file that re-exports all common utilities
|
||||
│ ├── db.rs # Database setup/teardown utilities
|
||||
│ ├── env.rs # Environment configuration utilities
|
||||
│ ├── fixtures/ # Test data fixtures
|
||||
│ │ ├── mod.rs # Exports all fixtures
|
||||
│ │ ├── users.rs # User-related test data
|
||||
│ │ └── threads.rs # Thread-related test data
|
||||
│ └── helpers.rs # General test helper functions
|
||||
└── integration/ # Integration test files
|
||||
```
|
||||
|
||||
### Common Database Setup
|
||||
```rust
|
||||
// tests/common/db.rs
|
||||
use diesel::PgConnection;
|
||||
use diesel::r2d2::{ConnectionManager, Pool};
|
||||
use crate::database::{models, schema};
|
||||
use dotenv::dotenv;
|
||||
|
||||
pub struct TestDb {
|
||||
pub pool: Pool<ConnectionManager<PgConnection>>,
|
||||
}
|
||||
|
||||
impl TestDb {
|
||||
pub async fn new() -> anyhow::Result<Self> {
|
||||
dotenv().ok();
|
||||
|
||||
let database_url = std::env::var("TEST_DATABASE_URL")
|
||||
.expect("TEST_DATABASE_URL must be set");
|
||||
|
||||
let manager = ConnectionManager::<PgConnection>::new(database_url);
|
||||
let pool = Pool::builder()
|
||||
.max_size(5)
|
||||
.build(manager)?;
|
||||
|
||||
Ok(Self { pool })
|
||||
}
|
||||
|
||||
pub async fn setup_test_data(&self) -> anyhow::Result<()> {
|
||||
// Add common test data setup here
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cleanup(&self) -> anyhow::Result<()> {
|
||||
// Cleanup test data
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Common Environment Setup
|
||||
```rust
|
||||
// tests/common/env.rs
|
||||
use std::sync::Once;
|
||||
use dotenv::dotenv;
|
||||
|
||||
static ENV_SETUP: Once = Once::new();
|
||||
|
||||
pub fn setup_test_env() {
|
||||
ENV_SETUP.call_once(|| {
|
||||
dotenv().ok();
|
||||
// Set any default environment variables for tests
|
||||
std::env::set_var("TEST_ENV", "test");
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
### Example Test Fixtures
|
||||
```rust
|
||||
// tests/common/fixtures/users.rs
|
||||
use crate::database::models::User;
|
||||
use chrono::Utc;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn create_test_user() -> User {
|
||||
User {
|
||||
id: Uuid::new_v4(),
|
||||
email: "test@example.com".to_string(),
|
||||
name: Some("Test User".to_string()),
|
||||
config: serde_json::Value::Null,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
attributes: serde_json::Value::Null,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Using Common Test Utilities
|
||||
```rust
|
||||
// Example integration test using common utilities
|
||||
use crate::tests::common::{db::TestDb, env::setup_test_env, fixtures};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_user_creation() {
|
||||
// Setup test environment
|
||||
setup_test_env();
|
||||
|
||||
// Initialize test database
|
||||
let test_db = TestDb::new().await.expect("Failed to setup test database");
|
||||
|
||||
// Get test user fixture
|
||||
let test_user = fixtures::users::create_test_user();
|
||||
|
||||
// Run test
|
||||
let result = create_user(&test_db.pool, &test_user).await?;
|
||||
|
||||
// Cleanup
|
||||
test_db.cleanup().await?;
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
```
|
|
@ -608,7 +608,7 @@ diesel::joinable!(messages_deprecated -> datasets (dataset_id));
|
|||
diesel::joinable!(messages_deprecated -> threads_deprecated (thread_id));
|
||||
diesel::joinable!(messages_deprecated -> users (sent_by));
|
||||
diesel::joinable!(messages_to_files -> dashboard_files (file_id));
|
||||
diesel::joinable!(messages_to_files -> messages_deprecated (message_id));
|
||||
diesel::joinable!(messages_to_files -> messages (message_id));
|
||||
diesel::joinable!(messages_to_files -> metric_files (file_id));
|
||||
diesel::joinable!(permission_groups -> organizations (organization_id));
|
||||
diesel::joinable!(permission_groups_to_users -> permission_groups (permission_group_id));
|
||||
|
|
|
@ -240,6 +240,118 @@ impl AgentThreadHandler {
|
|||
self.agent.stream_process_thread(&thread).await
|
||||
}
|
||||
|
||||
async fn store_final_message_state(
|
||||
message: &Message,
|
||||
all_transformed_messages: Vec<BusterContainer>,
|
||||
organization_id: &Uuid,
|
||||
user_id: &Uuid,
|
||||
) -> Result<(), Error> {
|
||||
let mut conn = get_pg_pool().get().await?;
|
||||
|
||||
// Update final message state
|
||||
diesel::update(messages::table)
|
||||
.filter(messages::id.eq(message.id))
|
||||
.set((
|
||||
messages::response.eq(&message.response),
|
||||
messages::updated_at.eq(message.updated_at),
|
||||
))
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
// Process any completed metric or dashboard files
|
||||
for container in all_transformed_messages {
|
||||
match container {
|
||||
BusterContainer::ReasoningMessage(msg) => match msg.reasoning {
|
||||
ReasoningMessage::File(file) if file.file_type == "metric" => {
|
||||
let metric_file = MetricFile {
|
||||
id: Uuid::new_v4(),
|
||||
name: file.file_name.clone(),
|
||||
file_name: format!(
|
||||
"{}.yml",
|
||||
file.file_name.to_lowercase().replace(' ', "_")
|
||||
),
|
||||
content: serde_json::to_value(&file.file.unwrap_or_default())
|
||||
.unwrap_or_default(),
|
||||
verification: Verification::NotRequested,
|
||||
evaluation_obj: None,
|
||||
evaluation_summary: None,
|
||||
evaluation_score: None,
|
||||
organization_id: organization_id.clone(),
|
||||
created_by: user_id.clone(),
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
// Insert metric file
|
||||
diesel::insert_into(metric_files::table)
|
||||
.values(&metric_file)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
// Create message to file link
|
||||
let message_to_file = MessageToFile {
|
||||
id: Uuid::new_v4(),
|
||||
message_id: message.id,
|
||||
file_id: metric_file.id,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
diesel::insert_into(messages_to_files::table)
|
||||
.values(&message_to_file)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
}
|
||||
ReasoningMessage::File(file) if file.file_type == "dashboard" => {
|
||||
let dashboard_file = DashboardFile {
|
||||
id: Uuid::new_v4(),
|
||||
name: file.file_name.clone(),
|
||||
file_name: format!(
|
||||
"{}.yml",
|
||||
file.file_name.to_lowercase().replace(' ', "_")
|
||||
),
|
||||
content: serde_json::to_value(&file.file.unwrap_or_default())
|
||||
.unwrap_or_default(),
|
||||
filter: None,
|
||||
organization_id: organization_id.clone(),
|
||||
created_by: user_id.clone(),
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
// Insert dashboard file
|
||||
diesel::insert_into(dashboard_files::table)
|
||||
.values(&dashboard_file)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
// Create message to file link
|
||||
let message_to_file = MessageToFile {
|
||||
id: Uuid::new_v4(),
|
||||
message_id: message.id,
|
||||
file_id: dashboard_file.id,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
diesel::insert_into(messages_to_files::table)
|
||||
.values(&message_to_file)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
}
|
||||
_ => (), // Skip non-file messages or other file types
|
||||
},
|
||||
_ => (), // Skip non-reasoning messages
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_stream(
|
||||
mut rx: Receiver<Result<AgentMessage, Error>>,
|
||||
user_id: &Uuid,
|
||||
|
@ -270,200 +382,107 @@ impl AgentThreadHandler {
|
|||
}
|
||||
|
||||
while let Some(msg_result) = rx.recv().await {
|
||||
if let Ok(msg) = msg_result {
|
||||
match transform_message(chat_id, message_id, msg) {
|
||||
Ok((transformed_messages, event)) => {
|
||||
// Skip empty messages
|
||||
let non_empty_messages: Vec<_> = transformed_messages
|
||||
.into_iter()
|
||||
.filter(|msg| match msg {
|
||||
BusterContainer::ChatMessage(chat) => {
|
||||
chat.response_message.message.is_some()
|
||||
|| chat.response_message.message_chunk.is_some()
|
||||
}
|
||||
BusterContainer::ReasoningMessage(reasoning) => {
|
||||
match &reasoning.reasoning {
|
||||
ReasoningMessage::Thought(thought) => {
|
||||
thought.thoughts.is_some()
|
||||
}
|
||||
ReasoningMessage::File(file) => file.file.is_some(),
|
||||
match msg_result {
|
||||
Ok(msg) => {
|
||||
match transform_message(chat_id, message_id, msg) {
|
||||
Ok((transformed_messages, event)) => {
|
||||
// Skip empty messages
|
||||
let non_empty_messages: Vec<_> = transformed_messages
|
||||
.into_iter()
|
||||
.filter(|msg| match msg {
|
||||
BusterContainer::ChatMessage(chat) => {
|
||||
chat.response_message.message.is_some()
|
||||
|| chat.response_message.message_chunk.is_some()
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if non_empty_messages.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Filter messages for database storage with stricter rules
|
||||
let storage_messages: Vec<_> = non_empty_messages
|
||||
.iter()
|
||||
.filter(|msg| match msg {
|
||||
BusterContainer::ChatMessage(chat) => {
|
||||
chat.response_message.message.is_some()
|
||||
&& chat.response_message.message_chunk.is_none()
|
||||
}
|
||||
BusterContainer::ReasoningMessage(reasoning) => {
|
||||
match &reasoning.reasoning {
|
||||
ReasoningMessage::Thought(thought) => {
|
||||
thought.status == "completed" && thought.thoughts.is_some()
|
||||
}
|
||||
ReasoningMessage::File(file) => {
|
||||
file.status == "completed" && file.file.is_some()
|
||||
BusterContainer::ReasoningMessage(reasoning) => {
|
||||
match &reasoning.reasoning {
|
||||
ReasoningMessage::Thought(thought) => {
|
||||
thought.thoughts.is_some()
|
||||
}
|
||||
ReasoningMessage::File(file) => file.file.is_some(),
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if non_empty_messages.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Filter messages for storage with stricter rules
|
||||
let storage_messages: Vec<_> = non_empty_messages
|
||||
.iter()
|
||||
.filter(|msg| match msg {
|
||||
BusterContainer::ChatMessage(chat) => {
|
||||
chat.response_message.message.is_some()
|
||||
&& chat.response_message.message_chunk.is_none()
|
||||
}
|
||||
BusterContainer::ReasoningMessage(reasoning) => {
|
||||
match &reasoning.reasoning {
|
||||
ReasoningMessage::Thought(thought) => {
|
||||
thought.status == "completed" && thought.thoughts.is_some()
|
||||
}
|
||||
ReasoningMessage::File(file) => {
|
||||
file.status == "completed" && file.file.is_some()
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// Store transformed messages that meet storage criteria
|
||||
all_transformed_messages.extend(storage_messages);
|
||||
|
||||
// Update message in memory with latest messages
|
||||
message.response = serde_json::to_value(&all_transformed_messages).unwrap_or_default();
|
||||
message.updated_at = Utc::now();
|
||||
|
||||
// Send websocket messages for real-time updates
|
||||
for transformed in non_empty_messages {
|
||||
let response = WsResponseMessage::new_no_user(
|
||||
WsRoutes::Threads(ThreadRoute::Post),
|
||||
WsEvent::Threads(event.clone()),
|
||||
transformed,
|
||||
None,
|
||||
WsSendMethod::All,
|
||||
);
|
||||
|
||||
if let Err(e) = send_ws_message(&subscription, &response).await {
|
||||
tracing::error!("Failed to send websocket message: {}", e);
|
||||
break;
|
||||
}
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// Store transformed messages that meet storage criteria
|
||||
all_transformed_messages.extend(storage_messages);
|
||||
|
||||
// Update message in database with latest messages
|
||||
message.response = serde_json::to_value(&all_transformed_messages).unwrap_or_default();
|
||||
message.updated_at = Utc::now();
|
||||
|
||||
if let Err(e) = Self::insert_or_update_message(&message).await {
|
||||
tracing::error!("Failed to update message: {}", e);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Send websocket messages as before
|
||||
for transformed in non_empty_messages {
|
||||
let response = WsResponseMessage::new_no_user(
|
||||
WsRoutes::Threads(ThreadRoute::Post),
|
||||
WsEvent::Threads(event.clone()),
|
||||
transformed,
|
||||
None,
|
||||
WsSendMethod::All,
|
||||
);
|
||||
|
||||
if let Err(e) = send_ws_message(&subscription, &response).await {
|
||||
tracing::error!("Failed to send websocket message: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to transform message: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to transform message: {}", e);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error processing message: {}", e);
|
||||
// Store partial progress on error
|
||||
if let Err(store_err) = Self::store_final_message_state(
|
||||
&message,
|
||||
all_transformed_messages.clone(),
|
||||
organization_id,
|
||||
user_id,
|
||||
).await {
|
||||
tracing::error!("Failed to store final message state: {}", store_err);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut conn = match get_pg_pool().get().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get database connection: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Process any completed metric or dashboard files
|
||||
for container in all_transformed_messages {
|
||||
match container {
|
||||
BusterContainer::ReasoningMessage(msg) => match msg.reasoning {
|
||||
ReasoningMessage::File(file) if file.file_type == "metric" => {
|
||||
let metric_file = MetricFile {
|
||||
id: Uuid::new_v4(),
|
||||
name: file.file_name.clone(),
|
||||
file_name: format!(
|
||||
"{}.yml",
|
||||
file.file_name.to_lowercase().replace(' ', "_")
|
||||
),
|
||||
content: serde_json::to_value(&file.file.unwrap_or_default())
|
||||
.unwrap_or_default(),
|
||||
verification: Verification::NotRequested,
|
||||
evaluation_obj: None,
|
||||
evaluation_summary: None,
|
||||
evaluation_score: None,
|
||||
organization_id: organization_id.clone(),
|
||||
created_by: user_id.clone(),
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
// Insert metric file
|
||||
if let Err(e) = insert_into(metric_files::table)
|
||||
.values(&metric_file)
|
||||
.execute(&mut conn)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to insert metric file: {}", e);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Create message to file link
|
||||
let message_to_file = MessageToFile {
|
||||
id: Uuid::new_v4(),
|
||||
message_id: message.id,
|
||||
file_id: metric_file.id,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
if let Err(e) = insert_into(messages_to_files::table)
|
||||
.values(&message_to_file)
|
||||
.execute(&mut conn)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to insert message to file link: {}", e);
|
||||
}
|
||||
}
|
||||
ReasoningMessage::File(file) if file.file_type == "dashboard" => {
|
||||
let dashboard_file = DashboardFile {
|
||||
id: Uuid::new_v4(),
|
||||
name: file.file_name.clone(),
|
||||
file_name: format!(
|
||||
"{}.yml",
|
||||
file.file_name.to_lowercase().replace(' ', "_")
|
||||
),
|
||||
content: serde_json::to_value(&file.file.unwrap_or_default())
|
||||
.unwrap_or_default(),
|
||||
filter: None,
|
||||
organization_id: organization_id.clone(),
|
||||
created_by: user_id.clone(),
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
// Insert dashboard file
|
||||
if let Err(e) = insert_into(dashboard_files::table)
|
||||
.values(&dashboard_file)
|
||||
.execute(&mut conn)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to insert dashboard file: {}", e);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Create message to file link
|
||||
let message_to_file = MessageToFile {
|
||||
id: Uuid::new_v4(),
|
||||
message_id: message.id,
|
||||
file_id: dashboard_file.id,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
if let Err(e) = insert_into(messages_to_files::table)
|
||||
.values(&message_to_file)
|
||||
.execute(&mut conn)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to insert message to file link: {}", e);
|
||||
}
|
||||
}
|
||||
_ => (), // Skip non-file messages or other file types
|
||||
},
|
||||
_ => (), // Skip non-reasoning messages
|
||||
}
|
||||
// Store final message state after successful completion
|
||||
if let Err(e) = Self::store_final_message_state(
|
||||
&message,
|
||||
all_transformed_messages,
|
||||
organization_id,
|
||||
user_id,
|
||||
).await {
|
||||
tracing::error!("Failed to store final message state: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
use anyhow::Result;
|
||||
use diesel::PgConnection;
|
||||
use diesel::r2d2::{ConnectionManager, Pool};
|
||||
use dotenv::dotenv;
|
||||
|
||||
/// Represents a test database instance with utility functions
|
||||
pub struct TestDb {
|
||||
pub pool: Pool<ConnectionManager<PgConnection>>,
|
||||
}
|
||||
|
||||
impl TestDb {
|
||||
/// Creates a new test database connection pool
|
||||
pub async fn new() -> Result<Self> {
|
||||
dotenv().ok();
|
||||
|
||||
let database_url = std::env::var("TEST_DATABASE_URL")
|
||||
.expect("TEST_DATABASE_URL must be set");
|
||||
|
||||
let manager = ConnectionManager::<PgConnection>::new(database_url);
|
||||
let pool = Pool::builder()
|
||||
.max_size(5)
|
||||
.build(manager)?;
|
||||
|
||||
Ok(Self { pool })
|
||||
}
|
||||
|
||||
/// Sets up common test data that might be needed across multiple tests
|
||||
pub async fn setup_test_data(&self) -> Result<()> {
|
||||
// Add common test data setup here
|
||||
// For example:
|
||||
// - Create default test users
|
||||
// - Set up required organization data
|
||||
// - Initialize any required configuration
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Cleans up test data after tests complete
|
||||
pub async fn cleanup(&self) -> Result<()> {
|
||||
// Implement cleanup logic
|
||||
// For example:
|
||||
// - Delete test users
|
||||
// - Remove test organizations
|
||||
// - Clean up any test data created during tests
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets a connection from the pool
|
||||
pub async fn get_conn(&self) -> Result<diesel::r2d2::PooledConnection<ConnectionManager<PgConnection>>> {
|
||||
Ok(self.pool.get()?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implement Drop to ensure cleanup runs even if tests panic
|
||||
impl Drop for TestDb {
|
||||
fn drop(&mut self) {
|
||||
// Implement synchronous cleanup if needed
|
||||
// Note: This runs on drop, so it should be quick and not fail
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
use std::sync::Once;
|
||||
use dotenv::dotenv;
|
||||
|
||||
static ENV_SETUP: Once = Once::new();
|
||||
|
||||
/// Sets up the test environment
|
||||
/// This function is safe to call multiple times as it will only execute once
|
||||
pub fn setup_test_env() {
|
||||
ENV_SETUP.call_once(|| {
|
||||
dotenv().ok();
|
||||
|
||||
// Set default test environment variables
|
||||
if std::env::var("TEST_ENV").is_err() {
|
||||
std::env::set_var("TEST_ENV", "test");
|
||||
}
|
||||
|
||||
// Ensure required test variables are set
|
||||
let required_vars = [
|
||||
"TEST_DATABASE_URL",
|
||||
"TEST_API_KEY",
|
||||
];
|
||||
|
||||
for var in required_vars {
|
||||
if std::env::var(var).is_err() {
|
||||
panic!("Required test environment variable {} is not set", var);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Gets a test configuration value, with a default fallback
|
||||
pub fn get_test_config(key: &str, default: &str) -> String {
|
||||
std::env::var(key).unwrap_or_else(|_| default.to_string())
|
||||
}
|
||||
|
||||
/// Creates a temporary test directory and returns its path
|
||||
pub fn setup_test_dir() -> std::path::PathBuf {
|
||||
let test_dir = std::env::temp_dir().join("test_workspace");
|
||||
std::fs::create_dir_all(&test_dir).expect("Failed to create test directory");
|
||||
test_dir
|
||||
}
|
||||
|
||||
/// Cleans up the test directory
|
||||
pub fn cleanup_test_dir(test_dir: &std::path::Path) {
|
||||
if test_dir.exists() {
|
||||
std::fs::remove_dir_all(test_dir).expect("Failed to clean up test directory");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
pub mod users;
|
||||
pub mod threads;
|
||||
|
||||
// Re-export commonly used fixtures
|
||||
pub use users::create_test_user;
|
||||
pub use threads::create_test_thread;
|
|
@ -0,0 +1,54 @@
|
|||
use crate::database::models::Thread;
|
||||
use chrono::Utc;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Creates a test thread with default values
|
||||
pub fn create_test_thread(organization_id: Uuid, created_by: Uuid) -> Thread {
|
||||
Thread {
|
||||
id: Uuid::new_v4(),
|
||||
title: "Test Thread".to_string(),
|
||||
organization_id,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
created_by,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a test thread with custom values
|
||||
pub fn create_custom_test_thread(
|
||||
title: &str,
|
||||
organization_id: Uuid,
|
||||
created_by: Uuid,
|
||||
) -> Thread {
|
||||
Thread {
|
||||
id: Uuid::new_v4(),
|
||||
title: title.to_string(),
|
||||
organization_id,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
created_by,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates multiple test threads
|
||||
pub fn create_test_threads(
|
||||
count: usize,
|
||||
organization_id: Uuid,
|
||||
created_by: Uuid,
|
||||
) -> Vec<Thread> {
|
||||
(0..count)
|
||||
.map(|i| {
|
||||
Thread {
|
||||
id: Uuid::new_v4(),
|
||||
title: format!("Test Thread {}", i),
|
||||
organization_id,
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
created_by,
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
use crate::database::models::User;
|
||||
use chrono::Utc;
|
||||
use serde_json::json;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Creates a test user with default values
|
||||
pub fn create_test_user() -> User {
|
||||
User {
|
||||
id: Uuid::new_v4(),
|
||||
email: "test@example.com".to_string(),
|
||||
name: Some("Test User".to_string()),
|
||||
config: json!({}),
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
attributes: json!({}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a test user with custom values
|
||||
pub fn create_custom_test_user(
|
||||
email: &str,
|
||||
name: Option<&str>,
|
||||
attributes: serde_json::Value,
|
||||
) -> User {
|
||||
User {
|
||||
id: Uuid::new_v4(),
|
||||
email: email.to_string(),
|
||||
name: name.map(String::from),
|
||||
config: json!({}),
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
attributes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates multiple test users
|
||||
pub fn create_test_users(count: usize) -> Vec<User> {
|
||||
(0..count)
|
||||
.map(|i| {
|
||||
User {
|
||||
id: Uuid::new_v4(),
|
||||
email: format!("test{}@example.com", i),
|
||||
name: Some(format!("Test User {}", i)),
|
||||
config: json!({}),
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
attributes: json!({}),
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
use anyhow::Result;
|
||||
use chrono::{DateTime, Utc};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Generates a unique test identifier
|
||||
pub fn generate_test_id() -> String {
|
||||
format!("test_{}", Uuid::new_v4())
|
||||
}
|
||||
|
||||
/// Gets the current timestamp for test data
|
||||
pub fn get_test_timestamp() -> DateTime<Utc> {
|
||||
Utc::now()
|
||||
}
|
||||
|
||||
/// Creates a test error for error case testing
|
||||
pub fn create_test_error(message: &str) -> anyhow::Error {
|
||||
anyhow::anyhow!("Test error: {}", message)
|
||||
}
|
||||
|
||||
/// Waits for a condition with timeout
|
||||
pub async fn wait_for_condition<F>(
|
||||
condition: F,
|
||||
timeout_ms: u64,
|
||||
check_interval_ms: u64,
|
||||
) -> Result<bool>
|
||||
where
|
||||
F: Fn() -> Result<bool>,
|
||||
{
|
||||
let start = std::time::Instant::now();
|
||||
let timeout = std::time::Duration::from_millis(timeout_ms);
|
||||
let check_interval = std::time::Duration::from_millis(check_interval_ms);
|
||||
|
||||
while start.elapsed() < timeout {
|
||||
if condition()? {
|
||||
return Ok(true);
|
||||
}
|
||||
tokio::time::sleep(check_interval).await;
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// Runs a function with retry logic
|
||||
pub async fn retry_with_backoff<F, T, E>(
|
||||
operation: F,
|
||||
max_retries: u32,
|
||||
initial_delay_ms: u64,
|
||||
) -> Result<T>
|
||||
where
|
||||
F: Fn() -> Result<T, E>,
|
||||
E: std::error::Error + Send + Sync + 'static,
|
||||
{
|
||||
let mut current_retry = 0;
|
||||
let mut delay = initial_delay_ms;
|
||||
|
||||
loop {
|
||||
match operation() {
|
||||
Ok(value) => return Ok(value),
|
||||
Err(e) => {
|
||||
if current_retry >= max_retries {
|
||||
return Err(anyhow::Error::new(e));
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
|
||||
current_retry += 1;
|
||||
delay *= 2; // Exponential backoff
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
pub mod db;
|
||||
pub mod env;
|
||||
pub mod fixtures;
|
||||
pub mod helpers;
|
||||
|
||||
// Re-export commonly used utilities
|
||||
pub use db::TestDb;
|
||||
pub use env::setup_test_env;
|
Loading…
Reference in New Issue