Previously, when block streaming was disabled (the default), text generated between tool calls would only appear after all tools completed. This was because onBlockReply wasn't passed to the subscription when block streaming was off, so flushBlockReplyBuffer() before tool execution did nothing. Now onBlockReply is always passed, and when block streaming is disabled, block replies are sent directly during tool flush. Directly sent payloads are tracked to avoid duplicates in final payloads. Also fixes a race condition where tool summaries could be emitted before the typing indicator started by awaiting onAgentEvent in tool handlers.
248 lines
8.5 KiB
TypeScript
248 lines
8.5 KiB
TypeScript
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
|
|
import type { AssistantMessage } from "@mariozechner/pi-ai";
|
|
|
|
import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js";
|
|
import { emitAgentEvent } from "../infra/agent-events.js";
|
|
import {
|
|
isMessagingToolDuplicateNormalized,
|
|
normalizeTextForComparison,
|
|
} from "./pi-embedded-helpers.js";
|
|
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
|
|
import { appendRawStream } from "./pi-embedded-subscribe.raw-stream.js";
|
|
import {
|
|
extractAssistantText,
|
|
extractAssistantThinking,
|
|
extractThinkingFromTaggedStream,
|
|
extractThinkingFromTaggedText,
|
|
formatReasoningMessage,
|
|
promoteThinkingTagsToBlocks,
|
|
} from "./pi-embedded-utils.js";
|
|
import { createInlineCodeState } from "../markdown/code-spans.js";
|
|
|
|
export function handleMessageStart(
|
|
ctx: EmbeddedPiSubscribeContext,
|
|
evt: AgentEvent & { message: AgentMessage },
|
|
) {
|
|
const msg = evt.message;
|
|
if (msg?.role !== "assistant") return;
|
|
|
|
// KNOWN: Resetting at `text_end` is unsafe (late/duplicate end events).
|
|
// ASSUME: `message_start` is the only reliable boundary for “new assistant message begins”.
|
|
// Start-of-message is a safer reset point than message_end: some providers
|
|
// may deliver late text_end updates after message_end, which would otherwise
|
|
// re-trigger block replies.
|
|
ctx.resetAssistantMessageState(ctx.state.assistantTexts.length);
|
|
// Use assistant message_start as the earliest "writing" signal for typing.
|
|
void ctx.params.onAssistantMessageStart?.();
|
|
}
|
|
|
|
export function handleMessageUpdate(
|
|
ctx: EmbeddedPiSubscribeContext,
|
|
evt: AgentEvent & { message: AgentMessage; assistantMessageEvent?: unknown },
|
|
) {
|
|
const msg = evt.message;
|
|
if (msg?.role !== "assistant") return;
|
|
|
|
const assistantEvent = evt.assistantMessageEvent;
|
|
const assistantRecord =
|
|
assistantEvent && typeof assistantEvent === "object"
|
|
? (assistantEvent as Record<string, unknown>)
|
|
: undefined;
|
|
const evtType = typeof assistantRecord?.type === "string" ? assistantRecord.type : "";
|
|
|
|
if (evtType !== "text_delta" && evtType !== "text_start" && evtType !== "text_end") {
|
|
return;
|
|
}
|
|
|
|
const delta = typeof assistantRecord?.delta === "string" ? assistantRecord.delta : "";
|
|
const content = typeof assistantRecord?.content === "string" ? assistantRecord.content : "";
|
|
|
|
appendRawStream({
|
|
ts: Date.now(),
|
|
event: "assistant_text_stream",
|
|
runId: ctx.params.runId,
|
|
sessionId: (ctx.params.session as { id?: string }).id,
|
|
evtType,
|
|
delta,
|
|
content,
|
|
});
|
|
|
|
let chunk = "";
|
|
if (evtType === "text_delta") {
|
|
chunk = delta;
|
|
} else if (evtType === "text_start" || evtType === "text_end") {
|
|
if (delta) {
|
|
chunk = delta;
|
|
} else if (content) {
|
|
// KNOWN: Some providers resend full content on `text_end`.
|
|
// We only append a suffix (or nothing) to keep output monotonic.
|
|
if (content.startsWith(ctx.state.deltaBuffer)) {
|
|
chunk = content.slice(ctx.state.deltaBuffer.length);
|
|
} else if (ctx.state.deltaBuffer.startsWith(content)) {
|
|
chunk = "";
|
|
} else if (!ctx.state.deltaBuffer.includes(content)) {
|
|
chunk = content;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (chunk) {
|
|
ctx.state.deltaBuffer += chunk;
|
|
if (ctx.blockChunker) {
|
|
ctx.blockChunker.append(chunk);
|
|
} else {
|
|
ctx.state.blockBuffer += chunk;
|
|
}
|
|
}
|
|
|
|
if (ctx.state.streamReasoning) {
|
|
// Handle partial <think> tags: stream whatever reasoning is visible so far.
|
|
ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer));
|
|
}
|
|
|
|
const next = ctx
|
|
.stripBlockTags(ctx.state.deltaBuffer, {
|
|
thinking: false,
|
|
final: false,
|
|
inlineCode: createInlineCodeState(),
|
|
})
|
|
.trim();
|
|
if (next && next !== ctx.state.lastStreamedAssistant) {
|
|
ctx.state.lastStreamedAssistant = next;
|
|
const { text: cleanedText, mediaUrls } = parseReplyDirectives(next);
|
|
emitAgentEvent({
|
|
runId: ctx.params.runId,
|
|
stream: "assistant",
|
|
data: {
|
|
text: cleanedText,
|
|
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
|
},
|
|
});
|
|
void ctx.params.onAgentEvent?.({
|
|
stream: "assistant",
|
|
data: {
|
|
text: cleanedText,
|
|
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
|
},
|
|
});
|
|
if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) {
|
|
void ctx.params.onPartialReply({
|
|
text: cleanedText,
|
|
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
|
});
|
|
}
|
|
}
|
|
|
|
if (ctx.params.onBlockReply && ctx.blockChunking && ctx.state.blockReplyBreak === "text_end") {
|
|
ctx.blockChunker?.drain({ force: false, emit: ctx.emitBlockChunk });
|
|
}
|
|
|
|
if (evtType === "text_end" && ctx.state.blockReplyBreak === "text_end") {
|
|
if (ctx.blockChunker?.hasBuffered()) {
|
|
ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
|
|
ctx.blockChunker.reset();
|
|
} else if (ctx.state.blockBuffer.length > 0) {
|
|
ctx.emitBlockChunk(ctx.state.blockBuffer);
|
|
ctx.state.blockBuffer = "";
|
|
}
|
|
}
|
|
}
|
|
|
|
export function handleMessageEnd(
|
|
ctx: EmbeddedPiSubscribeContext,
|
|
evt: AgentEvent & { message: AgentMessage },
|
|
) {
|
|
const msg = evt.message;
|
|
if (msg?.role !== "assistant") return;
|
|
|
|
const assistantMessage = msg as AssistantMessage;
|
|
promoteThinkingTagsToBlocks(assistantMessage);
|
|
|
|
const rawText = extractAssistantText(assistantMessage);
|
|
appendRawStream({
|
|
ts: Date.now(),
|
|
event: "assistant_message_end",
|
|
runId: ctx.params.runId,
|
|
sessionId: (ctx.params.session as { id?: string }).id,
|
|
rawText,
|
|
rawThinking: extractAssistantThinking(assistantMessage),
|
|
});
|
|
|
|
const text = ctx.stripBlockTags(rawText, { thinking: false, final: false });
|
|
const rawThinking =
|
|
ctx.state.includeReasoning || ctx.state.streamReasoning
|
|
? extractAssistantThinking(assistantMessage) || extractThinkingFromTaggedText(rawText)
|
|
: "";
|
|
const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : "";
|
|
|
|
const addedDuringMessage = ctx.state.assistantTexts.length > ctx.state.assistantTextBaseline;
|
|
const chunkerHasBuffered = ctx.blockChunker?.hasBuffered() ?? false;
|
|
ctx.finalizeAssistantTexts({ text, addedDuringMessage, chunkerHasBuffered });
|
|
|
|
const onBlockReply = ctx.params.onBlockReply;
|
|
const shouldEmitReasoning = Boolean(
|
|
ctx.state.includeReasoning &&
|
|
formattedReasoning &&
|
|
onBlockReply &&
|
|
formattedReasoning !== ctx.state.lastReasoningSent,
|
|
);
|
|
const shouldEmitReasoningBeforeAnswer =
|
|
shouldEmitReasoning && ctx.state.blockReplyBreak === "message_end" && !addedDuringMessage;
|
|
const maybeEmitReasoning = () => {
|
|
if (!shouldEmitReasoning || !formattedReasoning) return;
|
|
ctx.state.lastReasoningSent = formattedReasoning;
|
|
void onBlockReply?.({ text: formattedReasoning });
|
|
};
|
|
|
|
if (shouldEmitReasoningBeforeAnswer) maybeEmitReasoning();
|
|
|
|
if (
|
|
(ctx.state.blockReplyBreak === "message_end" ||
|
|
(ctx.blockChunker ? ctx.blockChunker.hasBuffered() : ctx.state.blockBuffer.length > 0)) &&
|
|
text &&
|
|
onBlockReply
|
|
) {
|
|
if (ctx.blockChunker?.hasBuffered()) {
|
|
ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
|
|
ctx.blockChunker.reset();
|
|
} else if (text !== ctx.state.lastBlockReplyText) {
|
|
// Check for duplicates before emitting (same logic as emitBlockChunk).
|
|
const normalizedText = normalizeTextForComparison(text);
|
|
if (
|
|
isMessagingToolDuplicateNormalized(
|
|
normalizedText,
|
|
ctx.state.messagingToolSentTextsNormalized,
|
|
)
|
|
) {
|
|
ctx.log.debug(
|
|
`Skipping message_end block reply - already sent via messaging tool: ${text.slice(0, 50)}...`,
|
|
);
|
|
} else {
|
|
ctx.state.lastBlockReplyText = text;
|
|
const { text: cleanedText, mediaUrls, audioAsVoice } = parseReplyDirectives(text);
|
|
// Emit if there's content OR audioAsVoice flag (to propagate the flag).
|
|
if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) {
|
|
void onBlockReply({
|
|
text: cleanedText,
|
|
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
|
audioAsVoice,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!shouldEmitReasoningBeforeAnswer) maybeEmitReasoning();
|
|
if (ctx.state.streamReasoning && rawThinking) {
|
|
ctx.emitReasoningStream(rawThinking);
|
|
}
|
|
|
|
ctx.state.deltaBuffer = "";
|
|
ctx.state.blockBuffer = "";
|
|
ctx.blockChunker?.reset();
|
|
ctx.state.blockState.thinking = false;
|
|
ctx.state.blockState.final = false;
|
|
ctx.state.blockState.inlineCode = createInlineCodeState();
|
|
ctx.state.lastStreamedAssistant = undefined;
|
|
}
|