fix(tui): buffer streaming messages by runId to prevent render ordering issues
Fixes #1172 - Add per-runId message buffering in ChatLog - Separate thinking stream from content stream handling - Ensure proper sequencing (thinking always before content) - Model-agnostic: works with or without thinking tokens
This commit is contained in:
@@ -4,19 +4,30 @@ import { AssistantMessageComponent } from "./assistant-message.js";
|
||||
import { ToolExecutionComponent } from "./tool-execution.js";
|
||||
import { UserMessageComponent } from "./user-message.js";
|
||||
|
||||
/**
|
||||
* Per-run streaming state - isolates each run's thinking and content streams.
|
||||
* This enables proper sequencing regardless of network arrival order.
|
||||
*/
|
||||
interface StreamingRunState {
|
||||
component: AssistantMessageComponent;
|
||||
thinkingText: string;
|
||||
contentText: string;
|
||||
showThinking: boolean;
|
||||
}
|
||||
|
||||
export class ChatLog extends Container {
|
||||
private toolById = new Map<string, ToolExecutionComponent>();
|
||||
private streamingAssistant: AssistantMessageComponent | null = null;
|
||||
private streamingRunId: string | null = null;
|
||||
private streamingText: string | null = null;
|
||||
// FIXED: Replace single streaming fields with per-runId Map for proper isolation
|
||||
private streamingRuns = new Map<string, StreamingRunState>();
|
||||
// Keep reference to most recent run for backward compatibility
|
||||
private lastStreamingRunId: string | null = null;
|
||||
private toolsExpanded = false;
|
||||
|
||||
clearAll() {
|
||||
this.clear();
|
||||
this.toolById.clear();
|
||||
this.streamingAssistant = null;
|
||||
this.streamingRunId = null;
|
||||
this.streamingText = null;
|
||||
this.streamingRuns.clear();
|
||||
this.lastStreamingRunId = null;
|
||||
}
|
||||
|
||||
addSystem(text: string) {
|
||||
@@ -28,48 +39,141 @@ export class ChatLog extends Container {
|
||||
this.addChild(new UserMessageComponent(text));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create streaming state for a specific runId.
|
||||
*/
|
||||
private getOrCreateRunState(runId: string, showThinking: boolean): StreamingRunState {
|
||||
let state = this.streamingRuns.get(runId);
|
||||
if (!state) {
|
||||
const component = new AssistantMessageComponent("");
|
||||
this.addChild(component);
|
||||
state = {
|
||||
component,
|
||||
thinkingText: "",
|
||||
contentText: "",
|
||||
showThinking,
|
||||
};
|
||||
this.streamingRuns.set(runId, state);
|
||||
this.lastStreamingRunId = runId;
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compose the final display text from thinking + content.
|
||||
* FIXED: Ensures thinking always appears before content regardless of arrival order.
|
||||
*/
|
||||
private composeDisplayText(state: StreamingRunState): string {
|
||||
const parts: string[] = [];
|
||||
|
||||
// Thinking comes first (if enabled and present)
|
||||
if (state.showThinking && state.thinkingText.trim()) {
|
||||
parts.push(`[thinking]\n${state.thinkingText}`);
|
||||
}
|
||||
|
||||
// Content comes after thinking
|
||||
if (state.contentText.trim()) {
|
||||
parts.push(state.contentText);
|
||||
}
|
||||
|
||||
return parts.join("\n\n").trim() || "";
|
||||
}
|
||||
|
||||
startAssistant(text: string, runId?: string) {
|
||||
const component = new AssistantMessageComponent(text);
|
||||
this.streamingAssistant = component;
|
||||
this.streamingRunId = runId ?? null;
|
||||
this.streamingText = text;
|
||||
if (runId) {
|
||||
// Create proper streaming state for tracked runs
|
||||
this.streamingRuns.set(runId, {
|
||||
component,
|
||||
thinkingText: "",
|
||||
contentText: text,
|
||||
showThinking: false,
|
||||
});
|
||||
this.lastStreamingRunId = runId;
|
||||
}
|
||||
this.addChild(component);
|
||||
return component;
|
||||
}
|
||||
|
||||
updateAssistant(text: string, runId?: string) {
|
||||
if (
|
||||
!this.streamingAssistant ||
|
||||
(runId && this.streamingRunId && runId !== this.streamingRunId)
|
||||
) {
|
||||
this.startAssistant(text, runId);
|
||||
return;
|
||||
/**
|
||||
* Update the assistant message with new streaming content.
|
||||
* FIXED: Now properly isolates by runId and separates thinking/content.
|
||||
*/
|
||||
updateAssistant(
|
||||
text: string,
|
||||
runId?: string,
|
||||
options?: {
|
||||
thinkingText?: string;
|
||||
contentText?: string;
|
||||
showThinking?: boolean;
|
||||
},
|
||||
) {
|
||||
const effectiveRunId = runId ?? "default";
|
||||
const showThinking = options?.showThinking ?? false;
|
||||
const state = this.getOrCreateRunState(effectiveRunId, showThinking);
|
||||
|
||||
// Update thinking and/or content separately if provided
|
||||
if (options?.thinkingText !== undefined) {
|
||||
state.thinkingText = options.thinkingText;
|
||||
}
|
||||
this.streamingText = text;
|
||||
this.streamingAssistant.setText(text);
|
||||
if (options?.contentText !== undefined) {
|
||||
state.contentText = options.contentText;
|
||||
}
|
||||
|
||||
// If only raw text provided (backward compatibility), use as content
|
||||
if (options?.thinkingText === undefined && options?.contentText === undefined) {
|
||||
state.contentText = text;
|
||||
}
|
||||
|
||||
state.showThinking = showThinking;
|
||||
|
||||
// Recompose and render with guaranteed ordering
|
||||
const displayText = this.composeDisplayText(state);
|
||||
state.component.setText(displayText);
|
||||
}
|
||||
|
||||
getStreamingText(runId?: string) {
|
||||
if (!this.streamingAssistant) return null;
|
||||
if (runId && this.streamingRunId && runId !== this.streamingRunId) {
|
||||
return null;
|
||||
}
|
||||
return this.streamingText;
|
||||
const effectiveRunId = runId ?? this.lastStreamingRunId;
|
||||
if (!effectiveRunId) return null;
|
||||
|
||||
const state = this.streamingRuns.get(effectiveRunId);
|
||||
if (!state) return null;
|
||||
|
||||
return this.composeDisplayText(state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the raw streaming state (for diagnostics).
|
||||
*/
|
||||
getStreamingState(runId: string): { thinking: string; content: string } | null {
|
||||
const state = this.streamingRuns.get(runId);
|
||||
if (!state) return null;
|
||||
return {
|
||||
thinking: state.thinkingText,
|
||||
content: state.contentText,
|
||||
};
|
||||
}
|
||||
|
||||
finalizeAssistant(text: string, runId?: string) {
|
||||
if (
|
||||
this.streamingAssistant &&
|
||||
(!runId || runId === this.streamingRunId || text === this.streamingText)
|
||||
) {
|
||||
this.streamingText = text;
|
||||
this.streamingAssistant.setText(text);
|
||||
const effectiveRunId = runId ?? this.lastStreamingRunId;
|
||||
const state = effectiveRunId ? this.streamingRuns.get(effectiveRunId) : null;
|
||||
|
||||
if (state) {
|
||||
// Use the final text, or compose from existing state if final is empty
|
||||
const finalText = text.trim() || this.composeDisplayText(state);
|
||||
state.component.setText(finalText);
|
||||
} else {
|
||||
// No existing state - create a new component with final text
|
||||
this.startAssistant(text, runId);
|
||||
}
|
||||
this.streamingAssistant = null;
|
||||
this.streamingRunId = null;
|
||||
this.streamingText = null;
|
||||
|
||||
// Clean up the streaming state for this run
|
||||
if (effectiveRunId) {
|
||||
this.streamingRuns.delete(effectiveRunId);
|
||||
if (this.lastStreamingRunId === effectiveRunId) {
|
||||
this.lastStreamingRunId = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
startTool(toolCallId: string, toolName: string, args: unknown) {
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
import type { TUI } from "@mariozechner/pi-tui";
|
||||
import type { ChatLog } from "./components/chat-log.js";
|
||||
import { asString, extractTextFromMessage, resolveFinalAssistantText } from "./tui-formatters.js";
|
||||
import {
|
||||
asString,
|
||||
extractTextFromMessage,
|
||||
extractThinkingFromMessage,
|
||||
extractContentFromMessage,
|
||||
resolveFinalAssistantText,
|
||||
} from "./tui-formatters.js";
|
||||
import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js";
|
||||
|
||||
type EventHandlerContext = {
|
||||
@@ -11,12 +17,25 @@ type EventHandlerContext = {
|
||||
refreshSessionInfo?: () => Promise<void>;
|
||||
};
|
||||
|
||||
/**
|
||||
* Per-run stream buffer for tracking thinking/content separately.
|
||||
* Enables proper sequencing regardless of network arrival order.
|
||||
*/
|
||||
interface RunStreamBuffer {
|
||||
thinkingText: string;
|
||||
contentText: string;
|
||||
lastUpdateMs: number;
|
||||
}
|
||||
|
||||
export function createEventHandlers(context: EventHandlerContext) {
|
||||
const { chatLog, tui, state, setActivityStatus, refreshSessionInfo } = context;
|
||||
const finalizedRuns = new Map<string, number>();
|
||||
// FIXED: Per-run stream buffers for proper isolation
|
||||
const runBuffers = new Map<string, RunStreamBuffer>();
|
||||
|
||||
const noteFinalizedRun = (runId: string) => {
|
||||
finalizedRuns.set(runId, Date.now());
|
||||
runBuffers.delete(runId); // Clean up buffer
|
||||
if (finalizedRuns.size <= 200) return;
|
||||
const keepUntil = Date.now() - 10 * 60 * 1000;
|
||||
for (const [key, ts] of finalizedRuns) {
|
||||
@@ -31,6 +50,22 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Get or create a stream buffer for a specific runId.
|
||||
*/
|
||||
const getOrCreateBuffer = (runId: string): RunStreamBuffer => {
|
||||
let buffer = runBuffers.get(runId);
|
||||
if (!buffer) {
|
||||
buffer = {
|
||||
thinkingText: "",
|
||||
contentText: "",
|
||||
lastUpdateMs: Date.now(),
|
||||
};
|
||||
runBuffers.set(runId, buffer);
|
||||
}
|
||||
return buffer;
|
||||
};
|
||||
|
||||
const handleChatEvent = (payload: unknown) => {
|
||||
if (!payload || typeof payload !== "object") return;
|
||||
const evt = payload as ChatEvent;
|
||||
@@ -40,11 +75,33 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
if (evt.state === "final") return;
|
||||
}
|
||||
if (evt.state === "delta") {
|
||||
const text = extractTextFromMessage(evt.message, {
|
||||
includeThinking: state.showThinking,
|
||||
const buffer = getOrCreateBuffer(evt.runId);
|
||||
|
||||
// FIXED: Extract thinking and content SEPARATELY for proper sequencing
|
||||
// This is model-agnostic: models without thinking blocks just return empty string
|
||||
const thinkingText = extractThinkingFromMessage(evt.message);
|
||||
const contentText = extractContentFromMessage(evt.message);
|
||||
|
||||
// Update buffer with new content
|
||||
// In streaming, we typically receive the full accumulated text each time
|
||||
if (thinkingText) {
|
||||
buffer.thinkingText = thinkingText;
|
||||
}
|
||||
if (contentText) {
|
||||
buffer.contentText = contentText;
|
||||
}
|
||||
buffer.lastUpdateMs = Date.now();
|
||||
|
||||
// Skip render if both are empty
|
||||
if (!buffer.thinkingText && !buffer.contentText) return;
|
||||
|
||||
// FIXED: Pass separated streams to ChatLog for proper sequencing
|
||||
chatLog.updateAssistant("", evt.runId, {
|
||||
thinkingText: buffer.thinkingText,
|
||||
contentText: buffer.contentText,
|
||||
showThinking: state.showThinking,
|
||||
});
|
||||
if (!text) return;
|
||||
chatLog.updateAssistant(text, evt.runId);
|
||||
|
||||
setActivityStatus("streaming");
|
||||
}
|
||||
if (evt.state === "final") {
|
||||
@@ -54,11 +111,23 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
? ((evt.message as Record<string, unknown>).stopReason as string)
|
||||
: ""
|
||||
: "";
|
||||
const text = extractTextFromMessage(evt.message, {
|
||||
includeThinking: state.showThinking,
|
||||
});
|
||||
|
||||
// FIXED: Extract final content with proper thinking handling
|
||||
const thinkingText = extractThinkingFromMessage(evt.message);
|
||||
const contentText = extractContentFromMessage(evt.message);
|
||||
|
||||
// Compose final text with proper ordering (thinking before content)
|
||||
const parts: string[] = [];
|
||||
if (state.showThinking && thinkingText.trim()) {
|
||||
parts.push(`[thinking]\n${thinkingText}`);
|
||||
}
|
||||
if (contentText.trim()) {
|
||||
parts.push(contentText);
|
||||
}
|
||||
const finalComposed = parts.join("\n\n").trim();
|
||||
|
||||
const finalText = resolveFinalAssistantText({
|
||||
finalText: text,
|
||||
finalText: finalComposed,
|
||||
streamedText: chatLog.getStreamingText(evt.runId),
|
||||
});
|
||||
chatLog.finalizeAssistant(finalText, evt.runId);
|
||||
@@ -70,12 +139,14 @@ export function createEventHandlers(context: EventHandlerContext) {
|
||||
}
|
||||
if (evt.state === "aborted") {
|
||||
chatLog.addSystem("run aborted");
|
||||
runBuffers.delete(evt.runId);
|
||||
state.activeChatRunId = null;
|
||||
setActivityStatus("aborted");
|
||||
void refreshSessionInfo?.();
|
||||
}
|
||||
if (evt.state === "error") {
|
||||
chatLog.addSystem(`run error: ${evt.errorMessage ?? "unknown"}`);
|
||||
runBuffers.delete(evt.runId);
|
||||
state.activeChatRunId = null;
|
||||
setActivityStatus("error");
|
||||
void refreshSessionInfo?.();
|
||||
|
||||
@@ -12,25 +12,94 @@ export function resolveFinalAssistantText(params: {
|
||||
return "(no output)";
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract ONLY thinking blocks from message content.
|
||||
* Model-agnostic: returns empty string if no thinking blocks exist.
|
||||
*/
|
||||
export function extractThinkingFromMessage(message: unknown): string {
|
||||
if (!message || typeof message !== "object") return "";
|
||||
const record = message as Record<string, unknown>;
|
||||
const content = record.content;
|
||||
if (typeof content === "string") return "";
|
||||
if (!Array.isArray(content)) return "";
|
||||
|
||||
const parts: string[] = [];
|
||||
for (const block of content) {
|
||||
if (!block || typeof block !== "object") continue;
|
||||
const rec = block as Record<string, unknown>;
|
||||
if (rec.type === "thinking" && typeof rec.thinking === "string") {
|
||||
parts.push(rec.thinking);
|
||||
}
|
||||
}
|
||||
return parts.join("\n").trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract ONLY text content blocks from message (excludes thinking).
|
||||
* Model-agnostic: works for any model with text content blocks.
|
||||
*/
|
||||
export function extractContentFromMessage(message: unknown): string {
|
||||
if (!message || typeof message !== "object") return "";
|
||||
const record = message as Record<string, unknown>;
|
||||
const content = record.content;
|
||||
|
||||
if (typeof content === "string") return content.trim();
|
||||
if (!Array.isArray(content)) return "";
|
||||
|
||||
const parts: string[] = [];
|
||||
for (const block of content) {
|
||||
if (!block || typeof block !== "object") continue;
|
||||
const rec = block as Record<string, unknown>;
|
||||
if (rec.type === "text" && typeof rec.text === "string") {
|
||||
parts.push(rec.text);
|
||||
}
|
||||
}
|
||||
|
||||
// If no text blocks found, check for error
|
||||
if (parts.length === 0) {
|
||||
const stopReason = typeof record.stopReason === "string" ? record.stopReason : "";
|
||||
if (stopReason === "error") {
|
||||
const errorMessage = typeof record.errorMessage === "string" ? record.errorMessage : "";
|
||||
return formatRawAssistantErrorForUi(errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
return parts.join("\n").trim();
|
||||
}
|
||||
|
||||
function extractTextBlocks(content: unknown, opts?: { includeThinking?: boolean }): string {
|
||||
if (typeof content === "string") return content.trim();
|
||||
if (!Array.isArray(content)) return "";
|
||||
const parts: string[] = [];
|
||||
|
||||
// FIXED: Separate collection to ensure proper ordering (thinking before text)
|
||||
const thinkingParts: string[] = [];
|
||||
const textParts: string[] = [];
|
||||
|
||||
for (const block of content) {
|
||||
if (!block || typeof block !== "object") continue;
|
||||
const record = block as Record<string, unknown>;
|
||||
if (record.type === "text" && typeof record.text === "string") {
|
||||
parts.push(record.text);
|
||||
textParts.push(record.text);
|
||||
}
|
||||
if (
|
||||
opts?.includeThinking &&
|
||||
record.type === "thinking" &&
|
||||
typeof record.thinking === "string"
|
||||
) {
|
||||
parts.push(`[thinking]\n${record.thinking}`);
|
||||
thinkingParts.push(`[thinking]\n${record.thinking}`);
|
||||
}
|
||||
}
|
||||
return parts.join("\n").trim();
|
||||
|
||||
// FIXED: Always put thinking BEFORE text content for consistent ordering
|
||||
const parts: string[] = [];
|
||||
if (thinkingParts.length > 0) {
|
||||
parts.push(...thinkingParts);
|
||||
}
|
||||
if (textParts.length > 0) {
|
||||
parts.push(...textParts);
|
||||
}
|
||||
|
||||
return parts.join("\n\n").trim();
|
||||
}
|
||||
|
||||
export function extractTextFromMessage(
|
||||
|
||||
Reference in New Issue
Block a user