mirror of https://github.com/buster-so/buster.git
sequential thinking and updates
This commit is contained in:
parent
1e5bc3977d
commit
4e6f13c181
|
@ -1,7 +1,5 @@
|
|||
import { updateMessageEntries } from '@buster/database';
|
||||
import type { ToolCallOptions } from 'ai';
|
||||
import { normalizeEscapedText } from '../../../utils/streaming/escape-normalizer';
|
||||
import { createSequentialThinkingReasoningMessage } from './helpers/sequential-thinking-tool-transform-helper';
|
||||
import type {
|
||||
SequentialThinkingContext,
|
||||
SequentialThinkingInput,
|
||||
|
@ -21,33 +19,13 @@ export function createSequentialThinkingFinish(
|
|||
sequentialThinkingState.nextThoughtNeeded = options.input.nextThoughtNeeded;
|
||||
sequentialThinkingState.thoughtNumber = options.input.thoughtNumber;
|
||||
|
||||
// Create final reasoning entry with completed status
|
||||
const reasoningEntry = createSequentialThinkingReasoningMessage(
|
||||
sequentialThinkingState,
|
||||
options.toolCallId,
|
||||
'completed' // Mark as completed when finish is called
|
||||
);
|
||||
|
||||
try {
|
||||
if (context.messageId) {
|
||||
const reasoningMessages = reasoningEntry ? [reasoningEntry] : [];
|
||||
|
||||
if (reasoningMessages.length > 0) {
|
||||
await updateMessageEntries({
|
||||
messageId: context.messageId,
|
||||
reasoningMessages,
|
||||
});
|
||||
|
||||
console.info('[sequential-thinking] Completed sequential thinking:', {
|
||||
messageId: context.messageId,
|
||||
toolCallId: options.toolCallId,
|
||||
thoughtNumber: options.input.thoughtNumber,
|
||||
nextThoughtNeeded: options.input.nextThoughtNeeded,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[sequential-thinking] Failed to update reasoning entry on finish:', error);
|
||||
}
|
||||
// No longer update reasoning message here - let execute handle the final update
|
||||
// This prevents race conditions between finish and execute
|
||||
console.info('[sequential-thinking] Finished streaming sequential thinking:', {
|
||||
messageId: context.messageId,
|
||||
toolCallId: options.toolCallId,
|
||||
thoughtNumber: options.input.thoughtNumber,
|
||||
nextThoughtNeeded: options.input.nextThoughtNeeded,
|
||||
});
|
||||
};
|
||||
}
|
||||
|
|
|
@ -262,7 +262,7 @@ describe('Sequential Thinking Tool Streaming Tests', () => {
|
|||
expect(state.toolCallId).toBe('tool-call-789');
|
||||
});
|
||||
|
||||
test('should update database with completed status on finish', async () => {
|
||||
test('should update state but not database on finish', async () => {
|
||||
const { updateMessageEntries } = await import('@buster/database');
|
||||
vi.mocked(updateMessageEntries).mockClear();
|
||||
|
||||
|
@ -288,11 +288,14 @@ describe('Sequential Thinking Tool Streaming Tests', () => {
|
|||
messages: [],
|
||||
});
|
||||
|
||||
expect(updateMessageEntries).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
messageId: mockContext.messageId,
|
||||
})
|
||||
);
|
||||
// Should update state
|
||||
expect(state.toolCallId).toBe('final-call');
|
||||
expect(state.thought).toBe('Final thought');
|
||||
expect(state.nextThoughtNeeded).toBe(false);
|
||||
expect(state.thoughtNumber).toBe(5);
|
||||
|
||||
// Should NOT update database (execute handles the final update)
|
||||
expect(updateMessageEntries).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -44,17 +44,23 @@ export async function updateMessageEntries({
|
|||
throw new Error(`Message not found: ${messageId}`);
|
||||
}
|
||||
|
||||
// Merge with new entries
|
||||
// Merge all entries concurrently
|
||||
const [mergedResponseMessages, mergedReasoning, mergedRawLlmMessages] = await Promise.all([
|
||||
responseMessages
|
||||
? Promise.resolve(mergeResponseMessages(existingEntries.responseMessages, responseMessages))
|
||||
: Promise.resolve(existingEntries.responseMessages),
|
||||
reasoningMessages
|
||||
? Promise.resolve(mergeReasoningMessages(existingEntries.reasoning, reasoningMessages))
|
||||
: Promise.resolve(existingEntries.reasoning),
|
||||
rawLlmMessages
|
||||
? Promise.resolve(mergeRawLlmMessages(existingEntries.rawLlmMessages, rawLlmMessages))
|
||||
: Promise.resolve(existingEntries.rawLlmMessages),
|
||||
]);
|
||||
|
||||
const mergedEntries = {
|
||||
responseMessages: responseMessages
|
||||
? mergeResponseMessages(existingEntries.responseMessages, responseMessages)
|
||||
: existingEntries.responseMessages,
|
||||
reasoning: reasoningMessages
|
||||
? mergeReasoningMessages(existingEntries.reasoning, reasoningMessages)
|
||||
: existingEntries.reasoning,
|
||||
rawLlmMessages: rawLlmMessages
|
||||
? mergeRawLlmMessages(existingEntries.rawLlmMessages, rawLlmMessages)
|
||||
: existingEntries.rawLlmMessages,
|
||||
responseMessages: mergedResponseMessages,
|
||||
reasoning: mergedReasoning,
|
||||
rawLlmMessages: mergedRawLlmMessages,
|
||||
};
|
||||
|
||||
// Build update data
|
||||
|
@ -74,14 +80,16 @@ export async function updateMessageEntries({
|
|||
updateData.rawLlmMessages = mergedEntries.rawLlmMessages;
|
||||
}
|
||||
|
||||
// Update database first for persistence
|
||||
await db
|
||||
.update(messages)
|
||||
.set(updateData)
|
||||
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)));
|
||||
|
||||
// Update cache after successful database write (cache reflects persisted state)
|
||||
messageEntriesCache.set(messageId, mergedEntries);
|
||||
// Update cache and database concurrently
|
||||
await Promise.all([
|
||||
// Update cache immediately (cache is source of truth during streaming)
|
||||
Promise.resolve(messageEntriesCache.set(messageId, mergedEntries)),
|
||||
// Update database for persistence
|
||||
db
|
||||
.update(messages)
|
||||
.set(updateData)
|
||||
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt))),
|
||||
]);
|
||||
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
|
|
Loading…
Reference in New Issue