Merge branch 'staging' into big-nate/bus-939-create-new-structure-for-chats

This commit is contained in:
Nate Kelley 2025-02-06 09:05:06 -07:00
commit da76a0e19b
No known key found for this signature in database
GPG Key ID: FD90372AB8D98B4F
7 changed files with 117 additions and 14 deletions

View File

@ -0,0 +1,81 @@
use anyhow::{anyhow, Result};
use axum::{extract::Path, http::StatusCode, Extension};
use chrono::Utc;
use diesel::{update, ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use serde_json::Value;
use uuid::Uuid;
use crate::database::{lib::get_pg_pool, models::User, schema::datasets};
pub async fn delete_dataset(
Extension(user): Extension<User>,
Path(dataset_id): Path<Uuid>,
) -> Result<StatusCode, (StatusCode, String)> {
match delete_dataset_handler(&user, dataset_id).await {
Ok(_) => Ok(StatusCode::NO_CONTENT),
Err(e) => {
tracing::error!("Error deleting dataset: {}", e);
Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
}
}
}
async fn delete_dataset_handler(user: &User, dataset_id: Uuid) -> Result<()> {
let mut conn = get_pg_pool()
.get()
.await
.map_err(|e| anyhow!("Unable to get connection from pool: {}", e))?;
// Get dataset's organization_id
let dataset = datasets::table
.select(datasets::organization_id)
.filter(datasets::id.eq(dataset_id))
.filter(datasets::deleted_at.is_null())
.first::<Uuid>(&mut conn)
.await
.map_err(|e| match e {
diesel::result::Error::NotFound => anyhow!("Dataset not found"),
_ => anyhow!("Error getting dataset: {}", e),
})?;
// Check user's organization and role
let user_org_id = user
.attributes
.get("organization_id")
.and_then(|v| match v {
Value::String(s) => Some(s.as_str()),
_ => None,
})
.ok_or_else(|| anyhow!("User organization id not found"))?;
let user_org_id =
Uuid::parse_str(user_org_id).map_err(|_| anyhow!("Invalid organization id format"))?;
if user_org_id != dataset {
return Err(anyhow!("User does not belong to dataset's organization"));
}
let user_role = user
.attributes
.get("organization_role")
.and_then(|v| match v {
Value::String(s) => Some(s.as_str()),
_ => None,
})
.ok_or_else(|| anyhow!("User role not found"))?;
if !["workspace_admin", "data_admin"].contains(&user_role) {
return Err(anyhow!("User does not have required permissions"));
}
// Soft delete the dataset
update(datasets::table)
.filter(datasets::id.eq(dataset_id))
.set(datasets::deleted_at.eq(Some(Utc::now())))
.execute(&mut conn)
.await
.map_err(|e| anyhow!("Error updating dataset: {}", e))?;
Ok(())
}

View File

@ -1,4 +1,5 @@
mod assets;
mod delete_dataset;
mod deploy_datasets;
mod get_dataset;
mod get_dataset_data_sample;
@ -6,7 +7,7 @@ mod list_datasets;
mod post_dataset;
use axum::{
routing::{get, post},
routing::{get, post, delete},
Router,
};
@ -16,6 +17,7 @@ pub fn router() -> Router {
.route("/", post(post_dataset::post_dataset))
.route("/deploy", post(deploy_datasets::deploy_datasets))
.route("/:dataset_id", get(get_dataset::get_dataset))
.route("/:dataset_id", delete(delete_dataset::delete_dataset))
.route(
"/:dataset_id/data/sample",
get(get_dataset_data_sample::get_dataset_data_sample),

View File

@ -1227,6 +1227,7 @@ async fn search_for_relevant_terms(
terms_search
where
fts @@ websearch_to_tsquery('{prompt}')
and organization_id = '{organization_id}'
order by rank_ix
limit least(10, 30) * 2
),
@ -1236,6 +1237,7 @@ semantic as (
row_number() over (order by embedding <#> '{prompt_embedding}') as rank_ix
from
terms_search
where organization_id = '{organization_id}'
order by rank_ix
limit least(10, 30) * 2
)

View File

@ -8,8 +8,8 @@ use crate::utils::{
prompt_node::{prompt_node, PromptNodeMessage, PromptNodeSettings},
},
prompts::analyst_chat_prompts::failed_to_fix_sql_prompts::{
failed_to_fix_sql_system_prompt, failed_to_fix_sql_user_prompt,
},
failed_to_fix_sql_system_prompt, failed_to_fix_sql_user_prompt,
},
};
pub enum FailedToFixSqlAgent {
@ -82,6 +82,7 @@ pub async fn failed_to_fix_sql_agent(
stream: Some(options.output_sender),
stream_name: Some("failed_to_fix_sql".to_string()),
prompt_name: "failed_to_fix_sql".to_string(),
json_mode: true,
..Default::default()
};
@ -93,19 +94,24 @@ pub async fn failed_to_fix_sql_agent(
}
};
// Combine master response with first part of response
let sql = match response.get("sql") {
Some(sql) => sql.as_str().unwrap_or_default().to_string(),
None => "".to_string(),
};
// Combine SQL with first part of response
let combined_response = match (
response.as_str(),
sql.as_str(),
options.outputs.get("first_part_of_response"),
) {
(Some(master_str), Some(first_part)) => {
(sql_str, Some(first_part)) if !sql_str.is_empty() => {
if let Some(first_part_str) = first_part.as_str() {
Value::String(format!("{}\n\n{}", first_part_str, master_str))
Value::String(format!("{}\n\n{}", first_part_str, sql_str))
} else {
response
Value::String(sql)
}
}
_ => response,
_ => Value::String(sql),
};
Ok(combined_response)

View File

@ -8,11 +8,20 @@ You should talk about how you tried to fix the SQL query three times, and that y
At the end make sure to apologize to the user
PLEASE OUTPUT THE SQL QUERY IN THE FOLLOWING JSON FORMAT:
```json
{
\"sql\": \"SELECT...\"
}
```
### GENERAL GUIDELINES
- Keep your response under 50 words
- Do not output the failed SQL query in your response
- Keep this response fairly non-technical
- escape columns, datasets, tables, errors, etc. with backticks.".to_string()
- escape columns, datasets, tables, errors, etc. with backticks.
- You must output the sql query in the format specified above.
".to_string()
}
pub fn failed_to_fix_sql_user_prompt(
@ -44,5 +53,7 @@ pub fn failed_to_fix_sql_user_prompt(
message.push_str(error);
}
message.push_str("\n\nPlease output the SQL query in the format specified above. (```sql ... ```)");
message
}

View File

@ -13,6 +13,7 @@ Your task is to pick all the datasets required to answer the user question/reque
- Feel free to select multiple datasets that can be joined together to provide more complete answers
- If the user requests advanced analysis like predictions, forecasts, correlation, impact analysis, etc., identify all datasets that could be combined for the analysis
- Consider relationships between datasets and how they can be joined to provide comprehensive answers
- Multiple dataset can be selected even while one completely answers the user request.
"#,
datasets
)

View File

@ -41,12 +41,12 @@ pub async fn ensure_stored_values_schema(organization_id: &Uuid) -> Result<()> {
// Create schema and table using raw SQL
let schema_name = organization_id.to_string().replace("-", "_");
let create_schema_sql = format!(
"CREATE SCHEMA IF NOT EXISTS {}_values",
"CREATE SCHEMA IF NOT EXISTS values_{}",
schema_name
);
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS {}_values.values_v1 (
"CREATE TABLE IF NOT EXISTS values_{}.values_v1 (
value text NOT NULL,
dataset_id uuid NOT NULL,
column_name text NOT NULL,
@ -60,7 +60,7 @@ pub async fn ensure_stored_values_schema(organization_id: &Uuid) -> Result<()> {
let create_index_sql = format!(
"CREATE INDEX IF NOT EXISTS values_v1_embedding_idx
ON {}_values.values_v1
ON values_{}.values_v1
USING ivfflat (embedding vector_cosine_ops)",
schema_name
);
@ -218,7 +218,7 @@ pub async fn search_stored_values(
let schema_name = organization_id.to_string().replace("-", "_");
let query = format!(
"SELECT value, column_name, column_id
FROM {}_values.values_v1
FROM values_{}.values_v1
WHERE dataset_id = $2::uuid
ORDER BY embedding <=> $1::vector
LIMIT $3::integer",