Bugfix: stored values

Merge pull request #90 from buster-so/dal/stored_values_enum_push_to_…
This commit is contained in:
dal 2025-02-04 15:11:29 -08:00 committed by GitHub
commit 543877395d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 41 additions and 2 deletions

View File

@ -7,7 +7,8 @@ use chrono::Utc;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use uuid::Uuid;
use crate::database::lib::get_pg_pool;
use crate::database::enums::StoredValuesStatus;
use crate::database::{lib::get_pg_pool, schema::dataset_columns};
use crate::utils::clients::ai::embedding_router::embedding_router;
use diesel::sql_types::{Text, Uuid as SqlUuid, Array, Float4, Timestamptz, Integer};
@ -88,6 +89,9 @@ pub async fn store_column_values(
// Query distinct values in batches
let mut offset = 0;
let mut first_batch = true;
let schema_name = organization_id.to_string().replace("-", "_");
loop {
let query = format!(
"SELECT DISTINCT \"{}\" as value
@ -104,6 +108,9 @@ pub async fn store_column_values(
Ok(results) => results,
Err(e) => {
tracing::error!("Error querying stored values: {:?}", e);
if first_batch {
return Err(e);
}
vec![]
}
};
@ -128,10 +135,41 @@ pub async fn store_column_values(
break;
}
// If this is the first batch and we have 15 or fewer values, handle as enum
if first_batch && values.len() <= 15 {
// Get current description
let current_description = diesel::sql_query("SELECT description FROM dataset_columns WHERE id = $1")
.bind::<SqlUuid, _>(column_id)
.get_result::<StoredValueRow>(&mut conn)
.await
.ok()
.and_then(|row| Some(row.value));
// Format new description
let enum_list = format!("Values for this column are: {}", values.join(", "));
let new_description = match current_description {
Some(desc) if !desc.is_empty() => format!("{}. {}", desc, enum_list),
_ => enum_list,
};
// Update column description
diesel::update(dataset_columns::table)
.filter(dataset_columns::id.eq(column_id))
.set((
dataset_columns::description.eq(new_description),
dataset_columns::stored_values_status.eq(StoredValuesStatus::Success),
dataset_columns::stored_values_count.eq(values.len() as i64),
dataset_columns::stored_values_last_synced.eq(Utc::now()),
))
.execute(&mut conn)
.await?;
return Ok(());
}
// Create embeddings for the batch
let embeddings = create_embeddings_batch(&values).await?;
let schema_name = organization_id.to_string().replace("-", "_");
// Insert values and embeddings
for (value, embedding) in values.iter().zip(embeddings.iter()) {
let insert_sql = format!(
@ -154,6 +192,7 @@ pub async fn store_column_values(
.await?;
}
first_batch = false;
offset += BATCH_SIZE;
}