Merge branch 'staging' into big-nate-bus-1830-ability-to-apply-color-theme-by-a-category

This commit is contained in:
Nate Kelley 2025-09-25 21:07:23 -06:00
commit 1ac26de837
No known key found for this signature in database
GPG Key ID: FD90372AB8D98B4F
14 changed files with 644 additions and 125 deletions

View File

@ -38,17 +38,6 @@ export async function getMetricDataHandler(
versionNumber?: number, versionNumber?: number,
reportFileId?: string reportFileId?: string
): Promise<MetricDataResponse> { ): Promise<MetricDataResponse> {
// Get user's organization
const userOrg = await getUserOrganizationId(user.id);
if (!userOrg) {
throw new HTTPException(403, {
message: 'You must be part of an organization to access metric data',
});
}
const { organizationId } = userOrg;
// Retrieve metric definition from database with data source info // Retrieve metric definition from database with data source info
const metric = await getMetricWithDataSource({ metricId, versionNumber }); const metric = await getMetricWithDataSource({ metricId, versionNumber });
@ -58,13 +47,6 @@ export async function getMetricDataHandler(
}); });
} }
// Verify metric belongs to user's organization
if (metric.organizationId !== organizationId) {
throw new HTTPException(403, {
message: 'You do not have permission to view this metric',
});
}
// Check if user has permission to view this metric file // Check if user has permission to view this metric file
// hasAssetPermission internally handles: // hasAssetPermission internally handles:
// 1. Direct permissions // 1. Direct permissions
@ -76,7 +58,7 @@ export async function getMetricDataHandler(
assetId: metricId, assetId: metricId,
assetType: 'metric_file', assetType: 'metric_file',
requiredRole: 'can_view', requiredRole: 'can_view',
organizationId, organizationId: metric.organizationId,
workspaceSharing: metric.workspaceSharing ?? 'none', workspaceSharing: metric.workspaceSharing ?? 'none',
publiclyAccessible: metric.publiclyAccessible, publiclyAccessible: metric.publiclyAccessible,
publicExpiryDate: metric.publicExpiryDate ?? undefined, publicExpiryDate: metric.publicExpiryDate ?? undefined,
@ -98,13 +80,13 @@ export async function getMetricDataHandler(
console.info('Checking R2 cache for metric data', { console.info('Checking R2 cache for metric data', {
metricId, metricId,
reportFileId, reportFileId,
organizationId, organizationId: metric.organizationId,
version: resolvedVersion, version: resolvedVersion,
}); });
try { try {
const cachedData = await getCachedMetricData( const cachedData = await getCachedMetricData(
organizationId, metric.organizationId,
metricId, metricId,
reportFileId, reportFileId,
resolvedVersion resolvedVersion
@ -184,22 +166,26 @@ export async function getMetricDataHandler(
console.info('Writing metric data to cache', { console.info('Writing metric data to cache', {
metricId, metricId,
reportFileId, reportFileId,
organizationId, organizationId: metric.organizationId,
version: resolvedVersion, version: resolvedVersion,
rowCount: trimmedData.length, rowCount: trimmedData.length,
}); });
// Fire and forget - don't wait for cache write // Fire and forget - don't wait for cache write
setCachedMetricData(organizationId, metricId, reportFileId, response, resolvedVersion).catch( setCachedMetricData(
(error) => { metric.organizationId,
console.error('Failed to cache metric data', { metricId,
metricId, reportFileId,
reportFileId, response,
version: resolvedVersion, resolvedVersion
error: error instanceof Error ? error.message : 'Unknown error', ).catch((error) => {
}); console.error('Failed to cache metric data', {
} metricId,
); reportFileId,
version: resolvedVersion,
error: error instanceof Error ? error.message : 'Unknown error',
});
});
} }
return response; return response;

View File

@ -1,6 +1,6 @@
import { db } from '@buster/database/connection'; import { db } from '@buster/database/connection';
import { beforeEach, describe, expect, it, vi } from 'vitest'; import { beforeEach, describe, expect, it, vi } from 'vitest';
import { findOrCreateSlackChat } from './events'; import { eventsHandler, findOrCreateSlackChat } from './events';
vi.mock('@buster/database/connection', () => ({ vi.mock('@buster/database/connection', () => ({
db: { db: {
@ -14,6 +14,31 @@ vi.mock('@buster/database/schema', () => ({
slackIntegrations: {}, slackIntegrations: {},
})); }));
vi.mock('@buster/database/queries', () => ({
getSecretByName: vi.fn(),
}));
vi.mock('@buster/slack', () => ({
SlackMessagingService: vi.fn(() => ({
sendMessage: vi.fn(),
})),
isEventCallback: vi.fn(),
isAppMentionEvent: vi.fn(),
isMessageImEvent: vi.fn(),
addReaction: vi.fn(),
}));
vi.mock('@trigger.dev/sdk', () => ({
tasks: {
trigger: vi.fn(),
},
}));
vi.mock('./services/slack-authentication', () => ({
authenticateSlackUser: vi.fn(),
getUserIdFromAuthResult: vi.fn(),
}));
describe('findOrCreateSlackChat', () => { describe('findOrCreateSlackChat', () => {
beforeEach(() => { beforeEach(() => {
vi.clearAllMocks(); vi.clearAllMocks();
@ -221,3 +246,310 @@ describe('findOrCreateSlackChat', () => {
}); });
}); });
}); });
describe('eventsHandler - Unauthorized Users', () => {
beforeEach(() => {
vi.clearAllMocks();
});
it('should return success without sending message for bot unauthorized users', async () => {
const { isEventCallback, isAppMentionEvent, isMessageImEvent } = await import('@buster/slack');
const { authenticateSlackUser } = await import('./services/slack-authentication');
// Mock event callback detection
vi.mocked(isEventCallback).mockReturnValue(true);
vi.mocked(isAppMentionEvent).mockReturnValue(true);
vi.mocked(isMessageImEvent).mockReturnValue(false);
// Mock authentication to return unauthorized bot
vi.mocked(authenticateSlackUser).mockResolvedValue({
type: 'unauthorized',
reason: 'User is a bot account',
} as any);
const payload = {
type: 'event_callback' as const,
token: 'xoxb-test-token',
team_id: 'T123456',
api_app_id: 'A123456',
event_id: 'E123456',
event_time: 1234567890,
event: {
type: 'app_mention' as const,
user: 'U123456',
channel: 'C123456',
text: 'Hello Buster',
ts: '1234567890.123456',
event_ts: '1234567890.123456',
},
};
const result = await eventsHandler(payload);
expect(result).toEqual({ success: true });
expect(authenticateSlackUser).toHaveBeenCalledWith('U123456', 'T123456');
// Should not attempt to send message or get access token
expect(db.select).not.toHaveBeenCalled();
});
it('should send unauthorized message for regular unauthorized users', async () => {
const { isEventCallback, isAppMentionEvent, isMessageImEvent, SlackMessagingService } =
await import('@buster/slack');
const { authenticateSlackUser } = await import('./services/slack-authentication');
const { getSecretByName } = await import('@buster/database/queries');
// Mock event callback detection
vi.mocked(isEventCallback).mockReturnValue(true);
vi.mocked(isAppMentionEvent).mockReturnValue(true);
vi.mocked(isMessageImEvent).mockReturnValue(false);
// Mock authentication to return unauthorized (non-bot)
vi.mocked(authenticateSlackUser).mockResolvedValue({
type: 'unauthorized',
reason: 'User email not found in organization domain',
} as any);
// Mock database query for getting access token
vi.mocked(db.select).mockReturnValue({
from: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue([{ tokenVaultKey: 'vault-key-123' }]),
} as any);
// Mock vault secret
vi.mocked(getSecretByName).mockResolvedValue({
secret: 'xoxb-test-token',
} as any);
// Mock messaging service
const mockSendMessage = vi.fn().mockResolvedValue({ ok: true, ts: '1234567890.123456' });
vi.mocked(SlackMessagingService).mockImplementation(
() =>
({
sendMessage: mockSendMessage,
}) as any
);
const payload = {
type: 'event_callback' as const,
token: 'xoxb-test-token',
team_id: 'T123456',
api_app_id: 'A123456',
event_id: 'E123456',
event_time: 1234567890,
event: {
type: 'app_mention' as const,
user: 'U123456',
channel: 'C123456',
text: 'Hello Buster',
ts: '1234567890.123456',
event_ts: '1234567890.123456',
},
};
await expect(eventsHandler(payload)).rejects.toThrow(
'Unauthorized: Slack user authentication failed'
);
expect(authenticateSlackUser).toHaveBeenCalledWith('U123456', 'T123456');
expect(getSecretByName).toHaveBeenCalledWith('vault-key-123');
expect(mockSendMessage).toHaveBeenCalledWith('xoxb-test-token', 'C123456', {
text: 'Sorry, you are unauthorized to chat with Buster. Please contact your Workspace Administrator for access.',
thread_ts: '1234567890.123456',
});
});
it('should send unauthorized message in thread for DM unauthorized users', async () => {
const { isEventCallback, isAppMentionEvent, isMessageImEvent, SlackMessagingService } =
await import('@buster/slack');
const { authenticateSlackUser } = await import('./services/slack-authentication');
const { getSecretByName } = await import('@buster/database/queries');
// Mock event callback detection - this time it's a DM
vi.mocked(isEventCallback).mockReturnValue(true);
vi.mocked(isAppMentionEvent).mockReturnValue(false);
vi.mocked(isMessageImEvent).mockReturnValue(true);
// Mock authentication to return unauthorized
vi.mocked(authenticateSlackUser).mockResolvedValue({
type: 'unauthorized',
reason: 'User not found in organization',
} as any);
// Mock database query for getting access token
vi.mocked(db.select).mockReturnValue({
from: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue([{ tokenVaultKey: 'vault-key-123' }]),
} as any);
// Mock vault secret
vi.mocked(getSecretByName).mockResolvedValue({
secret: 'xoxb-test-token',
} as any);
// Mock messaging service
const mockSendMessage = vi.fn().mockResolvedValue({ ok: true, ts: '1234567890.123456' });
vi.mocked(SlackMessagingService).mockImplementation(
() =>
({
sendMessage: mockSendMessage,
}) as any
);
const payload = {
type: 'event_callback' as const,
token: 'xoxb-test-token',
team_id: 'T123456',
api_app_id: 'A123456',
event_id: 'E123456',
event_time: 1234567890,
event: {
type: 'message' as const,
channel_type: 'im' as const,
user: 'U123456',
channel: 'D123456',
text: 'Hello Buster',
ts: '1234567890.123456',
event_ts: '1234567890.123456',
thread_ts: '1234567890.111111', // This is a threaded message
},
};
await expect(eventsHandler(payload)).rejects.toThrow(
'Unauthorized: Slack user authentication failed'
);
expect(authenticateSlackUser).toHaveBeenCalledWith('U123456', 'T123456');
expect(getSecretByName).toHaveBeenCalledWith('vault-key-123');
expect(mockSendMessage).toHaveBeenCalledWith('xoxb-test-token', 'D123456', {
text: 'Sorry, you are unauthorized to chat with Buster. Please contact your Workspace Administrator for access.',
thread_ts: '1234567890.111111', // Should use the existing thread_ts
});
});
it('should handle failure to send unauthorized message gracefully', async () => {
const { isEventCallback, isAppMentionEvent, isMessageImEvent, SlackMessagingService } =
await import('@buster/slack');
const { authenticateSlackUser } = await import('./services/slack-authentication');
const { getSecretByName } = await import('@buster/database/queries');
// Mock event callback detection
vi.mocked(isEventCallback).mockReturnValue(true);
vi.mocked(isAppMentionEvent).mockReturnValue(true);
vi.mocked(isMessageImEvent).mockReturnValue(false);
// Mock authentication to return unauthorized
vi.mocked(authenticateSlackUser).mockResolvedValue({
type: 'unauthorized',
reason: 'User not authorized',
} as any);
// Mock database query for getting access token
vi.mocked(db.select).mockReturnValue({
from: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue([{ tokenVaultKey: 'vault-key-123' }]),
} as any);
// Mock vault secret
vi.mocked(getSecretByName).mockResolvedValue({
secret: 'xoxb-test-token',
} as any);
// Mock messaging service to throw error
const mockSendMessage = vi.fn().mockRejectedValue(new Error('Slack API error'));
vi.mocked(SlackMessagingService).mockImplementation(
() =>
({
sendMessage: mockSendMessage,
}) as any
);
const payload = {
type: 'event_callback' as const,
token: 'xoxb-test-token',
team_id: 'T123456',
api_app_id: 'A123456',
event_id: 'E123456',
event_time: 1234567890,
event: {
type: 'app_mention' as const,
user: 'U123456',
channel: 'C123456',
text: 'Hello Buster',
ts: '1234567890.123456',
event_ts: '1234567890.123456',
},
};
// Should still throw the unauthorized error even if message sending fails
await expect(eventsHandler(payload)).rejects.toThrow(
'Unauthorized: Slack user authentication failed'
);
expect(authenticateSlackUser).toHaveBeenCalledWith('U123456', 'T123456');
expect(mockSendMessage).toHaveBeenCalled();
});
it('should handle case when no access token is available for unauthorized message', async () => {
const { isEventCallback, isAppMentionEvent, isMessageImEvent, SlackMessagingService } =
await import('@buster/slack');
const { authenticateSlackUser } = await import('./services/slack-authentication');
const { getSecretByName } = await import('@buster/database/queries');
// Mock event callback detection
vi.mocked(isEventCallback).mockReturnValue(true);
vi.mocked(isAppMentionEvent).mockReturnValue(true);
vi.mocked(isMessageImEvent).mockReturnValue(false);
// Mock authentication to return unauthorized
vi.mocked(authenticateSlackUser).mockResolvedValue({
type: 'unauthorized',
reason: 'User not authorized',
} as any);
// Mock database query to return empty result (no token found)
vi.mocked(db.select).mockReturnValue({
from: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnThis(),
limit: vi.fn().mockResolvedValue([]),
} as any);
// Mock messaging service (shouldn't be called)
const mockSendMessage = vi.fn();
vi.mocked(SlackMessagingService).mockImplementation(
() =>
({
sendMessage: mockSendMessage,
}) as any
);
const payload = {
type: 'event_callback' as const,
token: 'xoxb-test-token',
team_id: 'T123456',
api_app_id: 'A123456',
event_id: 'E123456',
event_time: 1234567890,
event: {
type: 'app_mention' as const,
user: 'U123456',
channel: 'C123456',
text: 'Hello Buster',
ts: '1234567890.123456',
event_ts: '1234567890.123456',
},
};
// Should still throw the unauthorized error even when no token is available
await expect(eventsHandler(payload)).rejects.toThrow(
'Unauthorized: Slack user authentication failed'
);
expect(authenticateSlackUser).toHaveBeenCalledWith('U123456', 'T123456');
expect(getSecretByName).not.toHaveBeenCalled();
expect(mockSendMessage).not.toHaveBeenCalled();
});
});

View File

@ -3,6 +3,7 @@ import { getSecretByName } from '@buster/database/queries';
import { chats, slackIntegrations } from '@buster/database/schema'; import { chats, slackIntegrations } from '@buster/database/schema';
import type { SlackEventsResponse } from '@buster/server-shared/slack'; import type { SlackEventsResponse } from '@buster/server-shared/slack';
import { import {
SlackMessagingService,
type SlackWebhookPayload, type SlackWebhookPayload,
addReaction, addReaction,
isAppMentionEvent, isAppMentionEvent,
@ -18,6 +19,41 @@ import {
getUserIdFromAuthResult, getUserIdFromAuthResult,
} from './services/slack-authentication'; } from './services/slack-authentication';
/**
* Helper function to get Slack access token from vault
*/
async function getSlackAccessToken(
teamId: string,
organizationId?: string
): Promise<string | null> {
const filters = [eq(slackIntegrations.teamId, teamId), eq(slackIntegrations.status, 'active')];
if (organizationId) {
filters.push(eq(slackIntegrations.organizationId, organizationId));
}
try {
// Fetch Slack integration to get token vault key
const slackIntegration = await db
.select({
tokenVaultKey: slackIntegrations.tokenVaultKey,
})
.from(slackIntegrations)
.where(and(...filters))
.limit(1);
if (slackIntegration.length > 0 && slackIntegration[0]?.tokenVaultKey) {
// Get the access token from vault
const vaultSecret = await getSecretByName(slackIntegration[0].tokenVaultKey);
return vaultSecret?.secret || null;
}
return null;
} catch (error) {
console.error('Failed to get Slack access token from vault:', error);
return null;
}
}
/** /**
* Map authentication result type to database enum value * Map authentication result type to database enum value
*/ */
@ -176,9 +212,9 @@ export async function handleSlackEventsEndpoint(c: Context) {
return c.json(response); return c.json(response);
} catch (error) { } catch (error) {
// Handle authentication errors // Handle authentication errors with 200 status code to prevent Slack from retrying the webhook
if (error instanceof Error && error.message.includes('Unauthorized')) { if (error instanceof Error && error.message.includes('Unauthorized')) {
return c.json({ error: 'Unauthorized' }, 401); return c.json({ error: 'Unauthorized' }, 200);
} }
// Re-throw other errors // Re-throw other errors
throw error; throw error;
@ -212,19 +248,41 @@ export async function eventsHandler(payload: SlackWebhookPayload): Promise<Slack
// Authenticate the Slack user // Authenticate the Slack user
const authResult = await authenticateSlackUser(event.user, payload.team_id); const authResult = await authenticateSlackUser(event.user, payload.team_id);
// Check if authentication was successful if (authResult.type === 'unauthorized') {
const userId = getUserIdFromAuthResult(authResult); if (authResult.reason.toLowerCase().includes('bot')) {
if (!userId) { return { success: true };
console.warn('Slack user authentication failed:', { }
slackUserId: event.user,
teamId: payload.team_id, try {
reason: authResult.type === 'unauthorized' ? authResult.reason : 'Unknown', const accessToken = await getSlackAccessToken(payload.team_id);
}); if (accessToken) {
// Throw unauthorized error const messagingService = new SlackMessagingService();
const threadTs = event.thread_ts || event.ts;
await messagingService.sendMessage(accessToken, event.channel, {
text: 'Sorry, you are unauthorized to chat with Buster. Please contact your Workspace Administrator for access.',
thread_ts: threadTs,
});
console.info('Sent unauthorized message to Slack user', {
channel: event.channel,
user: event.user,
threadTs,
});
}
} catch (error) {
console.warn('Failed to send unauthorized message to Slack user', {
error: error instanceof Error ? error.message : 'Unknown error',
channel: event.channel,
user: event.user,
threadTs: event.thread_ts || event.ts,
});
}
throw new Error('Unauthorized: Slack user authentication failed'); throw new Error('Unauthorized: Slack user authentication failed');
} }
const organizationId = authResult.type === 'unauthorized' ? '' : authResult.organization.id; const userId = authResult.user.id;
const organizationId = authResult.organization.id;
// Extract thread timestamp - if no thread_ts, this is a new thread so use ts // Extract thread timestamp - if no thread_ts, this is a new thread so use ts
const threadTs = event.thread_ts || event.ts; const threadTs = event.thread_ts || event.ts;
@ -232,39 +290,21 @@ export async function eventsHandler(payload: SlackWebhookPayload): Promise<Slack
// Add hourglass reaction immediately after authentication // Add hourglass reaction immediately after authentication
if (organizationId) { if (organizationId) {
try { try {
// Fetch Slack integration to get token vault key const accessToken = await getSlackAccessToken(payload.team_id, organizationId);
const slackIntegration = await db
.select({
tokenVaultKey: slackIntegrations.tokenVaultKey,
})
.from(slackIntegrations)
.where(
and(
eq(slackIntegrations.organizationId, organizationId),
eq(slackIntegrations.teamId, payload.team_id),
eq(slackIntegrations.status, 'active')
)
)
.limit(1);
if (slackIntegration.length > 0 && slackIntegration[0]?.tokenVaultKey) { if (accessToken) {
// Get the access token from vault // Add the hourglass reaction
const vaultSecret = await getSecretByName(slackIntegration[0].tokenVaultKey); await addReaction({
accessToken,
channelId: event.channel,
messageTs: event.ts,
emoji: 'hourglass_flowing_sand',
});
if (vaultSecret?.secret) { console.info('Added hourglass reaction to app mention', {
// Add the hourglass reaction channel: event.channel,
await addReaction({ messageTs: event.ts,
accessToken: vaultSecret.secret, });
channelId: event.channel,
messageTs: event.ts,
emoji: 'hourglass_flowing_sand',
});
console.info('Added hourglass reaction to app mention', {
channel: event.channel,
messageTs: event.ts,
});
}
} }
} catch (error) { } catch (error) {
// Log but don't fail the entire process if reaction fails // Log but don't fail the entire process if reaction fails

View File

@ -48,11 +48,9 @@ export async function checkPermission(check: AssetPermissionCheck): Promise<Asse
} = check; } = check;
// Check cache first (only for single role checks) // Check cache first (only for single role checks)
if (!Array.isArray(requiredRole)) { const cached = getCachedPermission(userId, assetId, assetType, requiredRole);
const cached = getCachedPermission(userId, assetId, assetType, requiredRole); if (cached !== undefined) {
if (cached !== undefined) { return cached;
return cached;
}
} }
// Get user's organization memberships // Get user's organization memberships
@ -112,6 +110,8 @@ export async function checkPermission(check: AssetPermissionCheck): Promise<Asse
} }
} }
console.info('publiclyAccessible', publiclyAccessible);
if (publiclyAccessible) { if (publiclyAccessible) {
const hasPublicAccessCheck = hasPublicAccess( const hasPublicAccessCheck = hasPublicAccess(
publiclyAccessible, publiclyAccessible,

View File

@ -344,12 +344,12 @@ You operate in a loop to complete tasks:
- Strict JOINs: Only join tables where relationships are explicitly defined via `relationships` or `entities` keys in the provided data context/metadata. Do not join tables without a pre-defined relationship. - Strict JOINs: Only join tables where relationships are explicitly defined via `relationships` or `entities` keys in the provided data context/metadata. Do not join tables without a pre-defined relationship.
- SQL Requirements: - SQL Requirements:
- Use database-qualified schema-qualified table names (`<DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME>`). - Use database-qualified schema-qualified table names (`<DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME>`).
- Use fully qualified column names with table aliases (e.g., `<table_alias>.<column>`). - Use column names qualified with table aliases (e.g., `<table_alias>.<column>`).
- MANDATORY SQL NAMING CONVENTIONS: - MANDATORY SQL NAMING CONVENTIONS:
- All Table References: MUST be fully qualified: `DATABASE_NAME.SCHEMA_NAME.TABLE_NAME`. - All Table References: MUST be fully qualified: `DATABASE_NAME.SCHEMA_NAME.TABLE_NAME`.
- All Column References: MUST be qualified with their table alias (e.g., `alias.column_name`) or CTE name (e.g., `cte_alias.column_name_from_cte`). - All Column References: MUST be qualified with their table alias (e.g., `c.customerid`) or CTE name (e.g., `cte_alias.column_name_from_cte`).
- Inside CTE Definitions: When defining a CTE (e.g., `WITH my_cte AS (SELECT t.column1 FROM DATABASE.SCHEMA.TABLE1 t ...)`), all columns selected from underlying database tables MUST use their table alias (e.g., `t.column1`, not just `column1`). This applies even if the CTE is simple and selects from only one table. - Inside CTE Definitions: When defining a CTE (e.g., `WITH my_cte AS (SELECT c.customerid FROM DATABASE.SCHEMA.TABLE1 c ...)`), all columns selected from underlying database tables MUST use their table alias (e.g., `c.customerid`, not just `customerid`). This applies even if the CTE is simple and selects from only one table.
- Selecting From CTEs: When selecting from a defined CTE, use the CTE's alias for its columns (e.g., `SELECT mc.column1 FROM my_cte mc ...`). - Selecting From CTEs: When selecting from a defined CTE, use the CTE's alias for its columns (e.g., `SELECT mc.column_name FROM my_cte mc ...`).
- Universal Application: These naming conventions are strict requirements and apply universally to all parts of the SQL query, including every CTE definition and every subsequent SELECT statement. Non-compliance will lead to errors. - Universal Application: These naming conventions are strict requirements and apply universally to all parts of the SQL query, including every CTE definition and every subsequent SELECT statement. Non-compliance will lead to errors.
- Context Adherence: Strictly use only columns that are present in the data context provided by search results. Never invent or assume columns. - Context Adherence: Strictly use only columns that are present in the data context provided by search results. Never invent or assume columns.
- Select specific columns (avoid `SELECT *` or `COUNT(*)`). - Select specific columns (avoid `SELECT *` or `COUNT(*)`).

View File

@ -98,10 +98,10 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
const docsSystemMessage = docsContent const docsSystemMessage = docsContent
? ({ ? ({
role: 'system', role: 'system',
content: `<data_catalog_docs>\n${docsContent}\n</data_catalog_docs>`, content: `<data_catalog_docs>\n${docsContent}\n</data_catalog_docs>`,
providerOptions: DEFAULT_ANTHROPIC_OPTIONS, providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
} as ModelMessage) } as ModelMessage)
: null; : null;
async function stream({ messages }: AnalystStreamOptions) { async function stream({ messages }: AnalystStreamOptions) {
@ -134,19 +134,19 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
// Create analyst instructions system message with proper escaping // Create analyst instructions system message with proper escaping
const analystInstructionsMessage = analystInstructions const analystInstructionsMessage = analystInstructions
? ({ ? ({
role: 'system', role: 'system',
content: `<organization_instructions>\n${analystInstructions}\n</organization_instructions>`, content: `<organization_instructions>\n${analystInstructions}\n</organization_instructions>`,
providerOptions: DEFAULT_ANTHROPIC_OPTIONS, providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
} as ModelMessage) } as ModelMessage)
: null; : null;
// Create user personalization system message // Create user personalization system message
const userPersonalizationSystemMessage = userPersonalizationMessageContent const userPersonalizationSystemMessage = userPersonalizationMessageContent
? ({ ? ({
role: 'system', role: 'system',
content: userPersonalizationMessageContent, content: userPersonalizationMessageContent,
providerOptions: DEFAULT_ANTHROPIC_OPTIONS, providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
} as ModelMessage) } as ModelMessage)
: null; : null;
return wrapTraced( return wrapTraced(

View File

@ -75,4 +75,35 @@ describe('Analyst Agent Instructions', () => {
getAnalystAgentSystemPrompt(' '); // whitespace only getAnalystAgentSystemPrompt(' '); // whitespace only
}).toThrow('SQL dialect guidance is required'); }).toThrow('SQL dialect guidance is required');
}); });
it('should contain mandatory SQL naming conventions', () => {
const result = getAnalystAgentSystemPrompt('Test guidance');
// Check for MANDATORY SQL NAMING CONVENTIONS section
expect(result).toContain('MANDATORY SQL NAMING CONVENTIONS');
// Ensure table references require full qualification
expect(result).toContain('All Table References: MUST be fully qualified: `DATABASE_NAME.SCHEMA_NAME.TABLE_NAME`');
// Ensure column references use table aliases (not full qualifiers)
expect(result).toContain('All Column References: MUST be qualified with their table alias (e.g., `c.customerid`)');
// Ensure examples show table alias usage without full qualification
expect(result).toContain('c.customerid');
expect(result).not.toContain('postgres.ont_ont.customer.customerid');
// Ensure CTE examples use table aliases correctly
expect(result).toContain('SELECT c.customerid FROM DATABASE.SCHEMA.TABLE1 c');
expect(result).toContain('c.customerid`, not just `customerid`');
});
it('should use column names qualified with table aliases', () => {
const result = getAnalystAgentSystemPrompt('Test guidance');
// Check for the updated description
expect(result).toContain('Use column names qualified with table aliases');
// Ensure the old verbose description is not present
expect(result).not.toContain('Use fully qualified column names with table aliases');
});
}); });

View File

@ -145,4 +145,40 @@ describe('Think and Prep Agent Instructions', () => {
getThinkAndPrepAgentSystemPrompt(' '); // whitespace only getThinkAndPrepAgentSystemPrompt(' '); // whitespace only
}).toThrow('SQL dialect guidance is required'); }).toThrow('SQL dialect guidance is required');
}); });
describe.each([
['standard', 'standard'],
['investigation', 'investigation'],
])('SQL naming conventions in %s mode', (modeName, mode) => {
it(`should contain mandatory SQL naming conventions in ${modeName} mode`, () => {
const result = getThinkAndPrepAgentSystemPrompt('Test guidance', mode as 'standard' | 'investigation');
// Check for MANDATORY SQL NAMING CONVENTIONS section
expect(result).toContain('MANDATORY SQL NAMING CONVENTIONS');
// Ensure table references require full qualification
expect(result).toContain('All Table References: MUST be fully qualified: `DATABASE_NAME.SCHEMA_NAME.TABLE_NAME`');
// Ensure column references use table aliases (not full qualifiers)
expect(result).toContain('All Column References: MUST be qualified with their table alias (e.g., `c.customerid`)');
// Ensure examples show table alias usage without full qualification
expect(result).toContain('c.customerid');
expect(result).not.toContain('postgres.ont_ont.customer.customerid');
// Ensure CTE examples use table aliases correctly
expect(result).toContain('SELECT c.customerid FROM DATABASE.SCHEMA.TABLE1 c');
expect(result).toContain('c.customerid`, not just `customerid`');
});
it(`should use column names qualified with table aliases in ${modeName} mode`, () => {
const result = getThinkAndPrepAgentSystemPrompt('Test guidance', mode as 'standard' | 'investigation');
// Check for the updated description
expect(result).toContain('Use column names qualified with table aliases');
// Ensure the old verbose description is not present
expect(result).not.toContain('Use fully qualified column names with table aliases');
});
});
}); });

View File

@ -588,12 +588,12 @@ If all true → proceed to submit prep for Asset Creation with `submitThoughts`.
- Strict JOINs: Only join tables where relationships are explicitly defined via `relationships` or `entities` keys in the provided data context/metadata. Do not join tables without a pre-defined relationship. - Strict JOINs: Only join tables where relationships are explicitly defined via `relationships` or `entities` keys in the provided data context/metadata. Do not join tables without a pre-defined relationship.
- SQL Requirements: - SQL Requirements:
- Use database-qualified schema-qualified table names (`<DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME>`). - Use database-qualified schema-qualified table names (`<DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME>`).
- Use fully qualified column names with table aliases (e.g., `<table_alias>.<column>`). - Use column names qualified with table aliases (e.g., `<table_alias>.<column>`).
- MANDATORY SQL NAMING CONVENTIONS: - MANDATORY SQL NAMING CONVENTIONS:
- All Table References: MUST be fully qualified: `DATABASE_NAME.SCHEMA_NAME.TABLE_NAME`. - All Table References: MUST be fully qualified: `DATABASE_NAME.SCHEMA_NAME.TABLE_NAME`.
- All Column References: MUST be qualified with their table alias (e.g., `alias.column_name`) or CTE name (e.g., `cte_alias.column_name_from_cte`). - All Column References: MUST be qualified with their table alias (e.g., `c.customerid`) or CTE name (e.g., `cte_alias.column_name_from_cte`).
- Inside CTE Definitions: When defining a CTE (e.g., `WITH my_cte AS (SELECT t.column1 FROM DATABASE.SCHEMA.TABLE1 t ...)`), all columns selected from underlying database tables MUST use their table alias (e.g., `t.column1`, not just `column1`). This applies even if the CTE is simple and selects from only one table. - Inside CTE Definitions: When defining a CTE (e.g., `WITH my_cte AS (SELECT c.customerid FROM DATABASE.SCHEMA.TABLE1 c ...)`), all columns selected from underlying database tables MUST use their table alias (e.g., `c.customerid`, not just `customerid`). This applies even if the CTE is simple and selects from only one table.
- Selecting From CTEs: When selecting from a defined CTE, use the CTE's alias for its columns (e.g., `SELECT mc.column1 FROM my_cte mc ...`). - Selecting From CTEs: When selecting from a defined CTE, use the CTE's alias for its columns (e.g., `SELECT mc.column_name FROM my_cte mc ...`).
- Universal Application: These naming conventions are strict requirements and apply universally to all parts of the SQL query, including every CTE definition and every subsequent SELECT statement. Non-compliance will lead to errors. - Universal Application: These naming conventions are strict requirements and apply universally to all parts of the SQL query, including every CTE definition and every subsequent SELECT statement. Non-compliance will lead to errors.
- Context Adherence: Strictly use only columns that are present in the data context provided by search results. Never invent or assume columns. - Context Adherence: Strictly use only columns that are present in the data context provided by search results. Never invent or assume columns.
- Select specific columns (avoid `SELECT *` or `COUNT(*)`). - Select specific columns (avoid `SELECT *` or `COUNT(*)`).

View File

@ -465,12 +465,12 @@ When in doubt, be more thorough rather than less. Reports are the default becaus
- Strict JOINs: Only join tables where relationships are explicitly defined via `relationships` or `entities` keys in the provided data context/metadata. Do not join tables without a pre-defined relationship. - Strict JOINs: Only join tables where relationships are explicitly defined via `relationships` or `entities` keys in the provided data context/metadata. Do not join tables without a pre-defined relationship.
- SQL Requirements: - SQL Requirements:
- Use database-qualified schema-qualified table names (`<DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME>`). - Use database-qualified schema-qualified table names (`<DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME>`).
- Use fully qualified column names with table aliases (e.g., `<table_alias>.<column>`). - Use column names qualified with table aliases (e.g., `<table_alias>.<column>`).
- MANDATORY SQL NAMING CONVENTIONS: - MANDATORY SQL NAMING CONVENTIONS:
- All Table References: MUST be fully qualified: `DATABASE_NAME.SCHEMA_NAME.TABLE_NAME`. - All Table References: MUST be fully qualified: `DATABASE_NAME.SCHEMA_NAME.TABLE_NAME`.
- All Column References: MUST be qualified with their table alias (e.g., `alias.column_name`) or CTE name (e.g., `cte_alias.column_name_from_cte`). - All Column References: MUST be qualified with their table alias (e.g., `c.customerid`) or CTE name (e.g., `cte_alias.column_name_from_cte`).
- Inside CTE Definitions: When defining a CTE (e.g., `WITH my_cte AS (SELECT t.column1 FROM DATABASE.SCHEMA.TABLE1 t ...)`), all columns selected from underlying database tables MUST use their table alias (e.g., `t.column1`, not just `column1`). This applies even if the CTE is simple and selects from only one table. - Inside CTE Definitions: When defining a CTE (e.g., `WITH my_cte AS (SELECT c.customerid FROM DATABASE.SCHEMA.TABLE1 c ...)`), all columns selected from underlying database tables MUST use their table alias (e.g., `c.customerid`, not just `customerid`). This applies even if the CTE is simple and selects from only one table.
- Selecting From CTEs: When selecting from a defined CTE, use the CTE's alias for its columns (e.g., `SELECT mc.column1 FROM my_cte mc ...`). - Selecting From CTEs: When selecting from a defined CTE, use the CTE's alias for its columns (e.g., `SELECT mc.column_name FROM my_cte mc ...`).
- Universal Application: These naming conventions are strict requirements and apply universally to all parts of the SQL query, including every CTE definition and every subsequent SELECT statement. Non-compliance will lead to errors. - Universal Application: These naming conventions are strict requirements and apply universally to all parts of the SQL query, including every CTE definition and every subsequent SELECT statement. Non-compliance will lead to errors.
- Context Adherence: Strictly use only columns that are present in the data context provided by search results. Never invent or assume columns. - Context Adherence: Strictly use only columns that are present in the data context provided by search results. Never invent or assume columns.
- Select specific columns (avoid `SELECT *` or `COUNT(*)`). - Select specific columns (avoid `SELECT *` or `COUNT(*)`).

View File

@ -6,8 +6,15 @@ export const DEFAULT_ANTHROPIC_OPTIONS = {
gateway: { gateway: {
order: ['bedrock', 'anthropic', 'vertex'], order: ['bedrock', 'anthropic', 'vertex'],
}, },
headers: {}, anthropic: {
anthropic: { cacheControl: { type: 'ephemeral' } }, cacheControl: { type: 'ephemeral' },
},
bedrock: {
cacheControl: { type: 'ephemeral' },
additionalModelRequestFields: {
anthropic_beta: ['fine-grained-tool-streaming-2025-05-14'],
},
}
}; };
export const DEFAULT_OPENAI_OPTIONS = { export const DEFAULT_OPENAI_OPTIONS = {
@ -15,7 +22,7 @@ export const DEFAULT_OPENAI_OPTIONS = {
order: ['openai'], order: ['openai'],
}, },
openai: { openai: {
parallelToolCalls: false, // parallelToolCalls: false,
reasoningEffort: 'minimal', reasoningEffort: 'minimal',
verbosity: 'low', verbosity: 'low',
}, },

View File

@ -3,8 +3,8 @@ import { generateObject } from 'ai';
import type { ModelMessage } from 'ai'; import type { ModelMessage } from 'ai';
import { wrapTraced } from 'braintrust'; import { wrapTraced } from 'braintrust';
import { z } from 'zod'; import { z } from 'zod';
import { GPT5Nano } from '../../../llm'; import { Haiku35 } from '../../../llm';
import { DEFAULT_OPENAI_OPTIONS } from '../../../llm/providers/gateway'; import { DEFAULT_ANTHROPIC_OPTIONS } from '../../../llm/providers/gateway';
// Zod-first: define input/output schemas and export inferred types // Zod-first: define input/output schemas and export inferred types
export const generateChatTitleParamsSchema = z.object({ export const generateChatTitleParamsSchema = z.object({
@ -56,10 +56,10 @@ async function generateTitleWithLLM(messages: ModelMessage[]): Promise<string> {
const tracedChatTitle = wrapTraced( const tracedChatTitle = wrapTraced(
async () => { async () => {
const { object } = await generateObject({ const { object } = await generateObject({
model: GPT5Nano, model: Haiku35,
schema: llmOutputSchema, schema: llmOutputSchema,
messages: titleMessages, messages: titleMessages,
providerOptions: DEFAULT_OPENAI_OPTIONS, providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
}); });
return object; return object;

View File

@ -21,16 +21,14 @@ const UpdateMessageEntriesSchema = z.object({
export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>; export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>;
// Simple in-memory queue for each messageId
const updateQueues = new Map<string, Promise<{ success: boolean }>>();
/** /**
* Updates message entries with cache-first approach for streaming. * Internal function that performs the actual update logic.
* Cache is the source of truth during streaming, DB is updated for persistence. * This is separated so it can be queued.
*
* Merge logic:
* - responseMessages: upsert by 'id' field, maintaining order
* - reasoningMessages: upsert by 'id' field, maintaining order
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order
*/ */
export async function updateMessageEntries({ async function performUpdate({
messageId, messageId,
rawLlmMessages, rawLlmMessages,
responseMessages, responseMessages,
@ -95,3 +93,41 @@ export async function updateMessageEntries({
throw new Error(`Failed to update message entries for message ${messageId}`); throw new Error(`Failed to update message entries for message ${messageId}`);
} }
} }
/**
* Updates message entries with cache-first approach for streaming.
* Cache is the source of truth during streaming, DB is updated for persistence.
*
* Updates are queued per messageId to ensure they execute in order.
*
* Merge logic:
* - responseMessages: upsert by 'id' field, maintaining order
* - reasoningMessages: upsert by 'id' field, maintaining order
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order
*/
export async function updateMessageEntries(
params: UpdateMessageEntriesParams
): Promise<{ success: boolean }> {
const { messageId } = params;
// Get the current promise for this messageId, or use a resolved promise as the starting point
const currentQueue = updateQueues.get(messageId) ?? Promise.resolve({ success: true });
// Chain the new update to run after the current queue completes
const newQueue = currentQueue
.then(() => performUpdate(params))
.catch(() => performUpdate(params)); // Still try to run even if previous failed
// Update the queue for this messageId
updateQueues.set(messageId, newQueue);
// Clean up the queue entry once this update completes
newQueue.finally(() => {
// Only remove if this is still the current queue
if (updateQueues.get(messageId) === newQueue) {
updateQueues.delete(messageId);
}
});
return newQueue;
}

View File

@ -31,18 +31,26 @@ type VersionHistoryEntry = {
type VersionHistory = Record<string, VersionHistoryEntry>; type VersionHistory = Record<string, VersionHistoryEntry>;
// Simple in-memory queue for each reportId
const updateQueues = new Map<string, Promise<{
id: string;
name: string;
content: string;
versionHistory: VersionHistory | null;
}>>();
/** /**
* Updates a report with new content, optionally name, and version history in a single operation * Internal function that performs the actual update logic.
* This is more efficient than multiple individual updates * This is separated so it can be queued.
*/ */
export const batchUpdateReport = async ( async function performUpdate(
params: BatchUpdateReportInput params: BatchUpdateReportInput
): Promise<{ ): Promise<{
id: string; id: string;
name: string; name: string;
content: string; content: string;
versionHistory: VersionHistory | null; versionHistory: VersionHistory | null;
}> => { }> {
const { reportId, content, name, versionHistory } = BatchUpdateReportInputSchema.parse(params); const { reportId, content, name, versionHistory } = BatchUpdateReportInputSchema.parse(params);
try { try {
@ -93,4 +101,47 @@ export const batchUpdateReport = async (
throw new Error('Failed to batch update report'); throw new Error('Failed to batch update report');
} }
}
/**
* Updates a report with new content, optionally name, and version history in a single operation
* This is more efficient than multiple individual updates
*
* Updates are queued per reportId to ensure they execute in order.
*/
export const batchUpdateReport = async (
params: BatchUpdateReportInput
): Promise<{
id: string;
name: string;
content: string;
versionHistory: VersionHistory | null;
}> => {
const { reportId } = params;
// Get the current promise for this reportId, or use a resolved promise as the starting point
const currentQueue = updateQueues.get(reportId) ?? Promise.resolve({
id: '',
name: '',
content: '',
versionHistory: null
});
// Chain the new update to run after the current queue completes
const newQueue = currentQueue
.then(() => performUpdate(params))
.catch(() => performUpdate(params)); // Still try to run even if previous failed
// Update the queue for this reportId
updateQueues.set(reportId, newQueue);
// Clean up the queue entry once this update completes
newQueue.finally(() => {
// Only remove if this is still the current queue
if (updateQueues.get(reportId) === newQueue) {
updateQueues.delete(reportId);
}
});
return newQueue;
}; };