diff --git a/src/tui/components/chat-log.ts b/src/tui/components/chat-log.ts index e3ce93102..11a44c35c 100644 --- a/src/tui/components/chat-log.ts +++ b/src/tui/components/chat-log.ts @@ -4,19 +4,30 @@ import { AssistantMessageComponent } from "./assistant-message.js"; import { ToolExecutionComponent } from "./tool-execution.js"; import { UserMessageComponent } from "./user-message.js"; +/** + * Per-run streaming state - isolates each run's thinking and content streams. + * This enables proper sequencing regardless of network arrival order. + */ +interface StreamingRunState { + component: AssistantMessageComponent; + thinkingText: string; + contentText: string; + showThinking: boolean; +} + export class ChatLog extends Container { private toolById = new Map(); - private streamingAssistant: AssistantMessageComponent | null = null; - private streamingRunId: string | null = null; - private streamingText: string | null = null; + // FIXED: Replace single streaming fields with per-runId Map for proper isolation + private streamingRuns = new Map(); + // Keep reference to most recent run for backward compatibility + private lastStreamingRunId: string | null = null; private toolsExpanded = false; clearAll() { this.clear(); this.toolById.clear(); - this.streamingAssistant = null; - this.streamingRunId = null; - this.streamingText = null; + this.streamingRuns.clear(); + this.lastStreamingRunId = null; } addSystem(text: string) { @@ -28,48 +39,141 @@ export class ChatLog extends Container { this.addChild(new UserMessageComponent(text)); } + /** + * Get or create streaming state for a specific runId. + */ + private getOrCreateRunState(runId: string, showThinking: boolean): StreamingRunState { + let state = this.streamingRuns.get(runId); + if (!state) { + const component = new AssistantMessageComponent(""); + this.addChild(component); + state = { + component, + thinkingText: "", + contentText: "", + showThinking, + }; + this.streamingRuns.set(runId, state); + this.lastStreamingRunId = runId; + } + return state; + } + + /** + * Compose the final display text from thinking + content. + * FIXED: Ensures thinking always appears before content regardless of arrival order. + */ + private composeDisplayText(state: StreamingRunState): string { + const parts: string[] = []; + + // Thinking comes first (if enabled and present) + if (state.showThinking && state.thinkingText.trim()) { + parts.push(`[thinking]\n${state.thinkingText}`); + } + + // Content comes after thinking + if (state.contentText.trim()) { + parts.push(state.contentText); + } + + return parts.join("\n\n").trim() || ""; + } + startAssistant(text: string, runId?: string) { const component = new AssistantMessageComponent(text); - this.streamingAssistant = component; - this.streamingRunId = runId ?? null; - this.streamingText = text; + if (runId) { + // Create proper streaming state for tracked runs + this.streamingRuns.set(runId, { + component, + thinkingText: "", + contentText: text, + showThinking: false, + }); + this.lastStreamingRunId = runId; + } this.addChild(component); return component; } - updateAssistant(text: string, runId?: string) { - if ( - !this.streamingAssistant || - (runId && this.streamingRunId && runId !== this.streamingRunId) - ) { - this.startAssistant(text, runId); - return; + /** + * Update the assistant message with new streaming content. + * FIXED: Now properly isolates by runId and separates thinking/content. + */ + updateAssistant( + text: string, + runId?: string, + options?: { + thinkingText?: string; + contentText?: string; + showThinking?: boolean; + }, + ) { + const effectiveRunId = runId ?? "default"; + const showThinking = options?.showThinking ?? false; + const state = this.getOrCreateRunState(effectiveRunId, showThinking); + + // Update thinking and/or content separately if provided + if (options?.thinkingText !== undefined) { + state.thinkingText = options.thinkingText; } - this.streamingText = text; - this.streamingAssistant.setText(text); + if (options?.contentText !== undefined) { + state.contentText = options.contentText; + } + + // If only raw text provided (backward compatibility), use as content + if (options?.thinkingText === undefined && options?.contentText === undefined) { + state.contentText = text; + } + + state.showThinking = showThinking; + + // Recompose and render with guaranteed ordering + const displayText = this.composeDisplayText(state); + state.component.setText(displayText); } getStreamingText(runId?: string) { - if (!this.streamingAssistant) return null; - if (runId && this.streamingRunId && runId !== this.streamingRunId) { - return null; - } - return this.streamingText; + const effectiveRunId = runId ?? this.lastStreamingRunId; + if (!effectiveRunId) return null; + + const state = this.streamingRuns.get(effectiveRunId); + if (!state) return null; + + return this.composeDisplayText(state); + } + + /** + * Get the raw streaming state (for diagnostics). + */ + getStreamingState(runId: string): { thinking: string; content: string } | null { + const state = this.streamingRuns.get(runId); + if (!state) return null; + return { + thinking: state.thinkingText, + content: state.contentText, + }; } finalizeAssistant(text: string, runId?: string) { - if ( - this.streamingAssistant && - (!runId || runId === this.streamingRunId || text === this.streamingText) - ) { - this.streamingText = text; - this.streamingAssistant.setText(text); + const effectiveRunId = runId ?? this.lastStreamingRunId; + const state = effectiveRunId ? this.streamingRuns.get(effectiveRunId) : null; + + if (state) { + // Use the final text, or compose from existing state if final is empty + const finalText = text.trim() || this.composeDisplayText(state); + state.component.setText(finalText); } else { + // No existing state - create a new component with final text this.startAssistant(text, runId); } - this.streamingAssistant = null; - this.streamingRunId = null; - this.streamingText = null; + + // Clean up the streaming state for this run + if (effectiveRunId) { + this.streamingRuns.delete(effectiveRunId); + if (this.lastStreamingRunId === effectiveRunId) { + this.lastStreamingRunId = null; + } + } } startTool(toolCallId: string, toolName: string, args: unknown) { diff --git a/src/tui/tui-event-handlers.ts b/src/tui/tui-event-handlers.ts index 045c7f940..5fc075f51 100644 --- a/src/tui/tui-event-handlers.ts +++ b/src/tui/tui-event-handlers.ts @@ -1,6 +1,12 @@ import type { TUI } from "@mariozechner/pi-tui"; import type { ChatLog } from "./components/chat-log.js"; -import { asString, extractTextFromMessage, resolveFinalAssistantText } from "./tui-formatters.js"; +import { + asString, + extractTextFromMessage, + extractThinkingFromMessage, + extractContentFromMessage, + resolveFinalAssistantText, +} from "./tui-formatters.js"; import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js"; type EventHandlerContext = { @@ -11,12 +17,25 @@ type EventHandlerContext = { refreshSessionInfo?: () => Promise; }; +/** + * Per-run stream buffer for tracking thinking/content separately. + * Enables proper sequencing regardless of network arrival order. + */ +interface RunStreamBuffer { + thinkingText: string; + contentText: string; + lastUpdateMs: number; +} + export function createEventHandlers(context: EventHandlerContext) { const { chatLog, tui, state, setActivityStatus, refreshSessionInfo } = context; const finalizedRuns = new Map(); + // FIXED: Per-run stream buffers for proper isolation + const runBuffers = new Map(); const noteFinalizedRun = (runId: string) => { finalizedRuns.set(runId, Date.now()); + runBuffers.delete(runId); // Clean up buffer if (finalizedRuns.size <= 200) return; const keepUntil = Date.now() - 10 * 60 * 1000; for (const [key, ts] of finalizedRuns) { @@ -31,6 +50,22 @@ export function createEventHandlers(context: EventHandlerContext) { } }; + /** + * Get or create a stream buffer for a specific runId. + */ + const getOrCreateBuffer = (runId: string): RunStreamBuffer => { + let buffer = runBuffers.get(runId); + if (!buffer) { + buffer = { + thinkingText: "", + contentText: "", + lastUpdateMs: Date.now(), + }; + runBuffers.set(runId, buffer); + } + return buffer; + }; + const handleChatEvent = (payload: unknown) => { if (!payload || typeof payload !== "object") return; const evt = payload as ChatEvent; @@ -40,11 +75,33 @@ export function createEventHandlers(context: EventHandlerContext) { if (evt.state === "final") return; } if (evt.state === "delta") { - const text = extractTextFromMessage(evt.message, { - includeThinking: state.showThinking, + const buffer = getOrCreateBuffer(evt.runId); + + // FIXED: Extract thinking and content SEPARATELY for proper sequencing + // This is model-agnostic: models without thinking blocks just return empty string + const thinkingText = extractThinkingFromMessage(evt.message); + const contentText = extractContentFromMessage(evt.message); + + // Update buffer with new content + // In streaming, we typically receive the full accumulated text each time + if (thinkingText) { + buffer.thinkingText = thinkingText; + } + if (contentText) { + buffer.contentText = contentText; + } + buffer.lastUpdateMs = Date.now(); + + // Skip render if both are empty + if (!buffer.thinkingText && !buffer.contentText) return; + + // FIXED: Pass separated streams to ChatLog for proper sequencing + chatLog.updateAssistant("", evt.runId, { + thinkingText: buffer.thinkingText, + contentText: buffer.contentText, + showThinking: state.showThinking, }); - if (!text) return; - chatLog.updateAssistant(text, evt.runId); + setActivityStatus("streaming"); } if (evt.state === "final") { @@ -54,11 +111,23 @@ export function createEventHandlers(context: EventHandlerContext) { ? ((evt.message as Record).stopReason as string) : "" : ""; - const text = extractTextFromMessage(evt.message, { - includeThinking: state.showThinking, - }); + + // FIXED: Extract final content with proper thinking handling + const thinkingText = extractThinkingFromMessage(evt.message); + const contentText = extractContentFromMessage(evt.message); + + // Compose final text with proper ordering (thinking before content) + const parts: string[] = []; + if (state.showThinking && thinkingText.trim()) { + parts.push(`[thinking]\n${thinkingText}`); + } + if (contentText.trim()) { + parts.push(contentText); + } + const finalComposed = parts.join("\n\n").trim(); + const finalText = resolveFinalAssistantText({ - finalText: text, + finalText: finalComposed, streamedText: chatLog.getStreamingText(evt.runId), }); chatLog.finalizeAssistant(finalText, evt.runId); @@ -70,12 +139,14 @@ export function createEventHandlers(context: EventHandlerContext) { } if (evt.state === "aborted") { chatLog.addSystem("run aborted"); + runBuffers.delete(evt.runId); state.activeChatRunId = null; setActivityStatus("aborted"); void refreshSessionInfo?.(); } if (evt.state === "error") { chatLog.addSystem(`run error: ${evt.errorMessage ?? "unknown"}`); + runBuffers.delete(evt.runId); state.activeChatRunId = null; setActivityStatus("error"); void refreshSessionInfo?.(); diff --git a/src/tui/tui-formatters.ts b/src/tui/tui-formatters.ts index c869a52c3..f05b98983 100644 --- a/src/tui/tui-formatters.ts +++ b/src/tui/tui-formatters.ts @@ -12,25 +12,94 @@ export function resolveFinalAssistantText(params: { return "(no output)"; } +/** + * Extract ONLY thinking blocks from message content. + * Model-agnostic: returns empty string if no thinking blocks exist. + */ +export function extractThinkingFromMessage(message: unknown): string { + if (!message || typeof message !== "object") return ""; + const record = message as Record; + const content = record.content; + if (typeof content === "string") return ""; + if (!Array.isArray(content)) return ""; + + const parts: string[] = []; + for (const block of content) { + if (!block || typeof block !== "object") continue; + const rec = block as Record; + if (rec.type === "thinking" && typeof rec.thinking === "string") { + parts.push(rec.thinking); + } + } + return parts.join("\n").trim(); +} + +/** + * Extract ONLY text content blocks from message (excludes thinking). + * Model-agnostic: works for any model with text content blocks. + */ +export function extractContentFromMessage(message: unknown): string { + if (!message || typeof message !== "object") return ""; + const record = message as Record; + const content = record.content; + + if (typeof content === "string") return content.trim(); + if (!Array.isArray(content)) return ""; + + const parts: string[] = []; + for (const block of content) { + if (!block || typeof block !== "object") continue; + const rec = block as Record; + if (rec.type === "text" && typeof rec.text === "string") { + parts.push(rec.text); + } + } + + // If no text blocks found, check for error + if (parts.length === 0) { + const stopReason = typeof record.stopReason === "string" ? record.stopReason : ""; + if (stopReason === "error") { + const errorMessage = typeof record.errorMessage === "string" ? record.errorMessage : ""; + return formatRawAssistantErrorForUi(errorMessage); + } + } + + return parts.join("\n").trim(); +} + function extractTextBlocks(content: unknown, opts?: { includeThinking?: boolean }): string { if (typeof content === "string") return content.trim(); if (!Array.isArray(content)) return ""; - const parts: string[] = []; + + // FIXED: Separate collection to ensure proper ordering (thinking before text) + const thinkingParts: string[] = []; + const textParts: string[] = []; + for (const block of content) { if (!block || typeof block !== "object") continue; const record = block as Record; if (record.type === "text" && typeof record.text === "string") { - parts.push(record.text); + textParts.push(record.text); } if ( opts?.includeThinking && record.type === "thinking" && typeof record.thinking === "string" ) { - parts.push(`[thinking]\n${record.thinking}`); + thinkingParts.push(`[thinking]\n${record.thinking}`); } } - return parts.join("\n").trim(); + + // FIXED: Always put thinking BEFORE text content for consistent ordering + const parts: string[] = []; + if (thinkingParts.length > 0) { + parts.push(...thinkingParts); + } + if (textParts.length > 0) { + parts.push(...textParts); + } + + return parts.join("\n\n").trim(); } export function extractTextFromMessage(