refactor: reuse streaming text normalizer across callbacks

This commit is contained in:
Peter Steinberger
2026-01-12 22:27:46 +00:00
parent 7ba72aeb6c
commit 27d940f5b6

View File

@@ -549,8 +549,8 @@ export async function runReplyAgent(params: {
);
const normalizeStreamingText = (
payload: ReplyPayload,
): string | undefined => {
if (!allowPartialStream) return undefined;
): { text?: string; skip: boolean } => {
if (!allowPartialStream) return { skip: true };
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
@@ -561,18 +561,18 @@ export async function runReplyAgent(params: {
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
}
if (stripped.shouldSkip && (payload.mediaUrls?.length ?? 0) === 0) {
return undefined;
return { skip: true };
}
text = stripped.text;
}
if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return undefined;
return text;
if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return { skip: true };
return { text, skip: false };
};
const handlePartialForTyping = async (
payload: ReplyPayload,
): Promise<string | undefined> => {
const text = normalizeStreamingText(payload);
if (!text) return undefined;
const { text, skip } = normalizeStreamingText(payload);
if (skip || !text) return undefined;
await typingSignals.signalTextDelta(text);
return text;
};
@@ -712,21 +712,9 @@ export async function runReplyAgent(params: {
onBlockReply:
blockStreamingEnabled && opts?.onBlockReply
? async (payload) => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
}
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return;
text = stripped.text;
}
const { text, skip } = normalizeStreamingText(payload);
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (skip && !hasMedia) return;
const taggedPayload = applyReplyTagsToPayload(
{
text,
@@ -799,25 +787,8 @@ export async function runReplyAgent(params: {
// If a tool callback starts typing after the run finalized, we can end up with
// a typing loop that never sees a matching markRunComplete(). Track and drain.
const task = (async () => {
let text = payload.text;
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose(
"Stripped stray HEARTBEAT_OK token from reply",
);
}
if (
stripped.shouldSkip &&
(payload.mediaUrls?.length ?? 0) === 0
) {
return;
}
text = stripped.text;
}
const { text, skip } = normalizeStreamingText(payload);
if (skip) return;
await typingSignals.signalTextDelta(text);
await opts.onToolResult?.({
text,