diff --git a/api/src/utils/stored_values/mod.rs b/api/src/utils/stored_values/mod.rs index 19c776ac5..da2044e76 100644 --- a/api/src/utils/stored_values/mod.rs +++ b/api/src/utils/stored_values/mod.rs @@ -7,7 +7,8 @@ use chrono::Utc; use diesel::prelude::*; use diesel_async::RunQueryDsl; use uuid::Uuid; -use crate::database::lib::get_pg_pool; +use crate::database::enums::StoredValuesStatus; +use crate::database::{lib::get_pg_pool, schema::dataset_columns}; use crate::utils::clients::ai::embedding_router::embedding_router; use diesel::sql_types::{Text, Uuid as SqlUuid, Array, Float4, Timestamptz, Integer}; @@ -88,6 +89,9 @@ pub async fn store_column_values( // Query distinct values in batches let mut offset = 0; + let mut first_batch = true; + let schema_name = organization_id.to_string().replace("-", "_"); + loop { let query = format!( "SELECT DISTINCT \"{}\" as value @@ -104,6 +108,9 @@ pub async fn store_column_values( Ok(results) => results, Err(e) => { tracing::error!("Error querying stored values: {:?}", e); + if first_batch { + return Err(e); + } vec![] } }; @@ -128,10 +135,41 @@ pub async fn store_column_values( break; } + // If this is the first batch and we have 15 or fewer values, handle as enum + if first_batch && values.len() <= 15 { + // Get current description + let current_description = diesel::sql_query("SELECT description FROM dataset_columns WHERE id = $1") + .bind::(column_id) + .get_result::(&mut conn) + .await + .ok() + .and_then(|row| Some(row.value)); + + // Format new description + let enum_list = format!("Values for this column are: {}", values.join(", ")); + let new_description = match current_description { + Some(desc) if !desc.is_empty() => format!("{}. {}", desc, enum_list), + _ => enum_list, + }; + + // Update column description + diesel::update(dataset_columns::table) + .filter(dataset_columns::id.eq(column_id)) + .set(( + dataset_columns::description.eq(new_description), + dataset_columns::stored_values_status.eq(StoredValuesStatus::Success), + dataset_columns::stored_values_count.eq(values.len() as i64), + dataset_columns::stored_values_last_synced.eq(Utc::now()), + )) + .execute(&mut conn) + .await?; + + return Ok(()); + } + // Create embeddings for the batch let embeddings = create_embeddings_batch(&values).await?; - let schema_name = organization_id.to_string().replace("-", "_"); // Insert values and embeddings for (value, embedding) in values.iter().zip(embeddings.iter()) { let insert_sql = format!( @@ -154,6 +192,7 @@ pub async fn store_column_values( .await?; } + first_batch = false; offset += BATCH_SIZE; }