stubbed out the cancel endpoint and added the trigger run col

This commit is contained in:
dal 2025-07-10 16:26:30 -06:00
parent de959ea090
commit 87f3853ce8
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
10 changed files with 6310 additions and 42 deletions

View File

@ -9,7 +9,8 @@
"scripts": {
"prebuild": "bun run scripts/validate-env.js && pnpm run typecheck",
"build": "tsup",
"dev": "tsup --watch",
"dev": "bun --watch src/index.ts",
"dev:build": "tsup --watch",
"lint": "biome check",
"start": "bun dist/index.js",
"test": "vitest run",

View File

@ -0,0 +1,40 @@
import type { ChatWithMessages } from '@buster/server-shared/chats';
import type { User } from '@buster/database';
import { eq, and, isNull, isNotNull } from '@buster/database';
import { db, messages } from '@buster/database';
import { HTTPException } from 'hono/http-exception';
export async function cancelChatHandler(
chatId: string,
user: User
): Promise<ChatWithMessages> {
// Query for messages with the given chat_id where is_completed: false and trigger_run_id is not null
const incompleteTriggerMessages = await db
.select()
.from(messages)
.where(
and(
eq(messages.chatId, chatId),
eq(messages.isCompleted, false),
isNotNull(messages.triggerRunId)
)
);
// TODO: Implement trigger cancellation logic here
// For each message with a trigger_run_id, cancel the corresponding trigger run
// Example (to be implemented):
// for (const message of incompleteTriggerMessages) {
// if (message.triggerRunId) {
// await cancelTriggerRun(message.triggerRunId);
// }
// }
// After cancellation, return the chat object with messages
// This should match the format returned by get chat and post chat endpoints
// TODO: Fetch the full chat object with all messages in the same format as get_chat_handler
// For now, this is a stub that needs to be implemented
throw new HTTPException(501, {
message: 'Cancel chat endpoint not fully implemented - needs to return ChatWithMessages format',
});
}

View File

@ -88,6 +88,12 @@ export async function createChatHandler(
throw new Error('Trigger service returned invalid handle');
}
// Update the message with the trigger run ID
const { updateMessage } = await import('@buster/database');
await updateMessage(messageId, {
triggerRunId: taskHandle.id,
});
// Task was successfully queued - background analysis will proceed
} catch (triggerError) {
console.error('Failed to trigger analyst agent task:', triggerError);

View File

@ -3,6 +3,7 @@ import {
ChatError,
type ChatWithMessages,
ChatWithMessagesSchema,
CancelChatParamsSchema,
} from '@buster/server-shared/chats';
import { zValidator } from '@hono/zod-validator';
import { Hono } from 'hono';
@ -11,6 +12,7 @@ import '../../../types/hono.types'; //I added this to fix intermitent type error
import { HTTPException } from 'hono/http-exception';
import { z } from 'zod';
import { createChatHandler } from './handler';
import { cancelChatHandler } from './cancel-chat';
const app = new Hono()
// Apply authentication middleware
@ -48,6 +50,14 @@ const app = new Hono()
});
}
)
// DELETE /chats/:chat_id/cancel - Cancel a chat and its running triggers
.delete('/:chat_id/cancel', zValidator('param', CancelChatParamsSchema), async (c) => {
const params = c.req.valid('param');
const user = c.get('busterUser');
const response = await cancelChatHandler(params.chat_id, user);
return c.json(response);
})
.onError((e, c) => {
if (e instanceof ChatError) {
// we need to use this syntax instead of HTTPException because hono bubbles up 500 errors

View File

@ -523,7 +523,7 @@ describe('SnowflakeAdapter Integration', () => {
'should handle connection drops gracefully',
async () => {
await adapter.initialize(credentials);
// Simulate network interruption by destroying connection
// @ts-expect-error - Testing private property
const conn = adapter.connection;
@ -534,10 +534,10 @@ describe('SnowflakeAdapter Integration', () => {
});
});
}
// Next query should fail
await expect(adapter.query('SELECT 1')).rejects.toThrow();
// But adapter should be able to reinitialize
await adapter.initialize(credentials);
const result = await adapter.query('SELECT 1 as test');
@ -550,7 +550,7 @@ describe('SnowflakeAdapter Integration', () => {
'should handle query cancellation',
async () => {
await adapter.initialize(credentials);
// Start a long-running query and cancel it
const longQuery = adapter.query(
`SELECT COUNT(*) FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM
@ -559,9 +559,9 @@ describe('SnowflakeAdapter Integration', () => {
undefined,
100 // Very short timeout to force cancellation
);
await expect(longQuery).rejects.toThrow(/timeout/i);
// Should be able to run another query immediately
const result = await adapter.query('SELECT 1 as test');
expect(result.rows[0].TEST).toBe(1);
@ -573,10 +573,10 @@ describe('SnowflakeAdapter Integration', () => {
'should handle very long strings in queries',
async () => {
await adapter.initialize(credentials);
// Test with a very long string (1MB)
const veryLongString = 'x'.repeat(1000000);
// This should work but might be slow
const result = await adapter.query(
`SELECT LENGTH('${veryLongString}') as str_length`,
@ -584,7 +584,7 @@ describe('SnowflakeAdapter Integration', () => {
undefined,
60000 // 60 second timeout for large string
);
expect(result.rows[0].STR_LENGTH).toBe(1000000);
},
120000 // 2 minute timeout for this test
@ -594,7 +594,7 @@ describe('SnowflakeAdapter Integration', () => {
'should handle Unicode and special characters',
async () => {
await adapter.initialize(credentials);
const result = await adapter.query(`
SELECT
'🎉emoji🎊' as emoji_text,
@ -603,7 +603,7 @@ describe('SnowflakeAdapter Integration', () => {
'Special: <>&"\\/' as special_chars,
'Line' || CHR(10) || 'Break' as line_break
`);
expect(result.rows[0].EMOJI_TEXT).toBe('🎉emoji🎊');
expect(result.rows[0].CHINESE_TEXT).toBe('Chinese: 你好');
expect(result.rows[0].ARABIC_TEXT).toBe('Arabic: مرحبا');
@ -617,14 +617,14 @@ describe('SnowflakeAdapter Integration', () => {
'should handle extremely large result sets with maxRows',
async () => {
await adapter.initialize(credentials);
// Query that would return millions of rows
const result = await adapter.query(
'SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM',
undefined,
1000 // Limit to 1000 rows
);
expect(result.rows.length).toBe(1000);
expect(result.hasMoreRows).toBe(true);
expect(result.rowCount).toBe(1000);
@ -636,19 +636,19 @@ describe('SnowflakeAdapter Integration', () => {
'should handle rapid connection cycling',
async () => {
const results = [];
// Rapidly create, use, and close connections
for (let i = 0; i < 5; i++) {
const tempAdapter = new SnowflakeAdapter();
await tempAdapter.initialize(credentials);
const result = await tempAdapter.query(`SELECT ${i} as cycle_num`);
results.push(result.rows[0].CYCLE_NUM);
await tempAdapter.close();
// No delay - test rapid cycling
}
expect(results).toEqual([0, 1, 2, 3, 4]);
},
60000 // 1 minute for rapid cycling
@ -658,14 +658,14 @@ describe('SnowflakeAdapter Integration', () => {
'should handle warehouse suspension gracefully',
async () => {
await adapter.initialize(credentials);
// First query to ensure warehouse is running
await adapter.query('SELECT 1');
// Note: In production, warehouse might auto-suspend
// This test simulates querying after potential suspension
await new Promise(resolve => setTimeout(resolve, 5000)); // 5 second delay
await new Promise((resolve) => setTimeout(resolve, 5000)); // 5 second delay
// Should still work (Snowflake should auto-resume)
const result = await adapter.query('SELECT 2 as test');
expect(result.rows[0].TEST).toBe(2);
@ -677,7 +677,7 @@ describe('SnowflakeAdapter Integration', () => {
'should handle malformed SQL gracefully',
async () => {
await adapter.initialize(credentials);
const malformedQueries = [
'SELECT * FROM', // Incomplete
'SELCT * FROM table', // Typo
@ -685,11 +685,11 @@ describe('SnowflakeAdapter Integration', () => {
'SELECT * FROM "non.existent.schema"."table"', // Invalid schema
'SELECT 1; DROP TABLE test;', // Multiple statements
];
for (const query of malformedQueries) {
await expect(adapter.query(query)).rejects.toThrow();
}
// Should still be able to run valid queries
const result = await adapter.query('SELECT 1 as test');
expect(result.rows[0].TEST).toBe(1);
@ -702,24 +702,25 @@ describe('SnowflakeAdapter Integration', () => {
async () => {
const adapters: SnowflakeAdapter[] = [];
const promises: Promise<any>[] = [];
// Create many adapters without closing them (simulating pool exhaustion)
for (let i = 0; i < 20; i++) {
const tempAdapter = new SnowflakeAdapter();
adapters.push(tempAdapter);
const promise = tempAdapter.initialize(credentials)
const promise = tempAdapter
.initialize(credentials)
.then(() => tempAdapter.query(`SELECT ${i} as num`));
promises.push(promise);
}
// All should complete successfully
const results = await Promise.all(promises);
expect(results).toHaveLength(20);
// Cleanup
await Promise.all(adapters.map(a => a.close()));
await Promise.all(adapters.map((a) => a.close()));
},
120000 // 2 minutes for many connections
);
@ -728,17 +729,15 @@ describe('SnowflakeAdapter Integration', () => {
'should maintain connection integrity under load',
async () => {
await adapter.initialize(credentials);
// Run many queries in parallel on same adapter
const queryPromises = [];
for (let i = 0; i < 50; i++) {
queryPromises.push(
adapter.query(`SELECT ${i} as num, CURRENT_TIMESTAMP() as ts`)
);
queryPromises.push(adapter.query(`SELECT ${i} as num, CURRENT_TIMESTAMP() as ts`));
}
const results = await Promise.all(queryPromises);
// Verify all queries succeeded and returned correct data
expect(results).toHaveLength(50);
results.forEach((result, index) => {
@ -753,14 +752,14 @@ describe('SnowflakeAdapter Integration', () => {
'should handle binary data correctly',
async () => {
await adapter.initialize(credentials);
const result = await adapter.query(`
SELECT
TO_BINARY('48656C6C6F', 'HEX') as hex_binary,
TO_BINARY('SGVsbG8=', 'BASE64') as base64_binary,
BASE64_ENCODE(TO_BINARY('48656C6C6F', 'HEX')) as encoded_text
`);
expect(result.rows[0].HEX_BINARY).toBeTruthy();
expect(result.rows[0].BASE64_BINARY).toBeTruthy();
expect(result.rows[0].ENCODED_TEXT).toBe('SGVsbG8=');
@ -772,7 +771,7 @@ describe('SnowflakeAdapter Integration', () => {
'should handle timezone-aware timestamps',
async () => {
await adapter.initialize(credentials);
const result = await adapter.query(`
SELECT
CONVERT_TIMEZONE('UTC', 'America/New_York', '2024-01-01 12:00:00'::TIMESTAMP_NTZ) as ny_time,
@ -780,7 +779,7 @@ describe('SnowflakeAdapter Integration', () => {
CURRENT_TIMESTAMP() as current_ts,
SYSDATE() as sys_date
`);
expect(result.rows[0].NY_TIME).toBeTruthy();
expect(result.rows[0].TOKYO_TIME).toBeTruthy();
expect(result.rows[0].CURRENT_TS).toBeTruthy();

View File

@ -0,0 +1,2 @@
-- Add trigger_run_id column to messages table
ALTER TABLE messages ADD COLUMN IF NOT EXISTS trigger_run_id text;

File diff suppressed because it is too large Load Diff

View File

@ -547,6 +547,13 @@
"when": 1752150366202,
"tag": "0077_short_marvex",
"breakpoints": true
},
{
"idx": 78,
"version": "7",
"when": 1752184722835,
"tag": "0078_adorable_layla_miller",
"breakpoints": true
}
]
}

View File

@ -828,6 +828,7 @@ export const messages = pgTable(
feedback: text(),
isCompleted: boolean('is_completed').default(false).notNull(),
postProcessingMessage: jsonb('post_processing_message'),
triggerRunId: text('trigger_run_id'),
},
(table) => [
index('messages_chat_id_idx').using('btree', table.chatId.asc().nullsLast().op('uuid_ops')),

View File

@ -60,9 +60,15 @@ export const ChatCreateHandlerRequestSchema = z.object({
asset_type: AssetType.optional(),
});
// Cancel chat params schema
export const CancelChatParamsSchema = z.object({
chat_id: z.string().uuid(),
});
// Infer types from schemas
export type AssetPermissionRole = z.infer<typeof AssetPermissionRoleSchema>;
export type BusterShareIndividual = z.infer<typeof BusterShareIndividualSchema>;
export type ChatWithMessages = z.infer<typeof ChatWithMessagesSchema>;
export type ChatCreateRequest = z.infer<typeof ChatCreateRequestSchema>;
export type ChatCreateHandlerRequest = z.infer<typeof ChatCreateHandlerRequestSchema>;
export type CancelChatParams = z.infer<typeof CancelChatParamsSchema>;