fix(auto-reply): coalesce block replies and document streaming toggles (#536) (thanks @mcinteerj)

This commit is contained in:
Peter Steinberger
2026-01-09 18:19:55 +00:00
parent a05916bee8
commit fd15704c77
18 changed files with 714 additions and 99 deletions

View File

@@ -35,6 +35,8 @@ import { normalizeVerboseLevel, type VerboseLevel } from "../thinking.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { extractAudioTag } from "./audio-tags.js";
import { createBlockReplyPipeline } from "./block-reply-pipeline.js";
import { resolveBlockStreamingCoalescing } from "./block-streaming.js";
import { createFollowupRunner } from "./followup-runner.js";
import {
enqueueFollowupRun,
@@ -132,23 +134,6 @@ const appendUsageLine = (
return updated;
};
const withTimeout = async <T>(
promise: Promise<T>,
timeoutMs: number,
timeoutError: Error,
): Promise<T> => {
if (!timeoutMs || timeoutMs <= 0) return promise;
let timer: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(timeoutError), timeoutMs);
});
try {
return await Promise.race([promise, timeoutPromise]);
} finally {
if (timer) clearTimeout(timer);
}
};
export async function runReplyAgent(params: {
commandBody: string;
followupRun: FollowupRun;
@@ -228,29 +213,9 @@ export async function runReplyAgent(params: {
return resolvedVerboseLevel === "on";
};
const streamedPayloadKeys = new Set<string>();
const pendingStreamedPayloadKeys = new Set<string>();
const pendingBlockTasks = new Set<Promise<void>>();
const pendingToolTasks = new Set<Promise<void>>();
let blockReplyChain: Promise<void> = Promise.resolve();
let blockReplyAborted = false;
let didLogBlockReplyAbort = false;
let didStreamBlockReply = false;
const blockReplyTimeoutMs =
opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
const buildPayloadKey = (payload: ReplyPayload) => {
const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
: payload.mediaUrl
? [payload.mediaUrl]
: [];
return JSON.stringify({
text,
mediaList,
replyToId: payload.replyToId ?? null,
});
};
const replyToChannel =
sessionCtx.OriginatingChannel ??
((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as
@@ -265,6 +230,23 @@ export async function runReplyAgent(params: {
replyToChannel,
);
const cfg = followupRun.run.config;
const blockReplyCoalescing =
blockStreamingEnabled && opts?.onBlockReply
? resolveBlockStreamingCoalescing(
cfg,
sessionCtx.Provider,
sessionCtx.AccountId,
blockReplyChunking,
)
: undefined;
const blockReplyPipeline =
blockStreamingEnabled && opts?.onBlockReply
? createBlockReplyPipeline({
onBlockReply: opts.onBlockReply,
timeoutMs: blockReplyTimeoutMs,
coalescing: blockReplyCoalescing,
})
: null;
if (shouldSteer && isStreaming) {
const steered = queueEmbeddedPiMessage(
@@ -511,15 +493,6 @@ export async function runReplyAgent(params: {
text: cleaned,
audioAsVoice: audioTagResult.audioAsVoice,
});
const payloadKey = buildPayloadKey(blockPayload);
if (
streamedPayloadKeys.has(payloadKey) ||
pendingStreamedPayloadKeys.has(payloadKey)
) {
return;
}
if (blockReplyAborted) return;
pendingStreamedPayloadKeys.add(payloadKey);
void typingSignals
.signalTextDelta(taggedPayload.text)
.catch((err) => {
@@ -527,50 +500,7 @@ export async function runReplyAgent(params: {
`block reply typing signal failed: ${String(err)}`,
);
});
const timeoutError = new Error(
`block reply delivery timed out after ${blockReplyTimeoutMs}ms`,
);
const abortController = new AbortController();
blockReplyChain = blockReplyChain
.then(async () => {
if (blockReplyAborted) return false;
await withTimeout(
opts.onBlockReply?.(blockPayload, {
abortSignal: abortController.signal,
timeoutMs: blockReplyTimeoutMs,
}) ?? Promise.resolve(),
blockReplyTimeoutMs,
timeoutError,
);
return true;
})
.then((didSend) => {
if (!didSend) return;
streamedPayloadKeys.add(payloadKey);
didStreamBlockReply = true;
})
.catch((err) => {
if (err === timeoutError) {
abortController.abort();
blockReplyAborted = true;
if (!didLogBlockReplyAbort) {
didLogBlockReplyAbort = true;
logVerbose(
`block reply delivery timed out after ${blockReplyTimeoutMs}ms; skipping remaining block replies to preserve ordering`,
);
}
return;
}
logVerbose(
`block reply delivery failed: ${String(err)}`,
);
})
.finally(() => {
pendingStreamedPayloadKeys.delete(payloadKey);
});
const task = blockReplyChain;
pendingBlockTasks.add(task);
void task.finally(() => pendingBlockTasks.delete(task));
blockReplyPipeline?.enqueue(blockPayload);
}
: undefined,
shouldEmitToolResult,
@@ -684,8 +614,9 @@ export async function runReplyAgent(params: {
}
const payloadArray = runResult.payloads ?? [];
if (pendingBlockTasks.size > 0) {
await Promise.allSettled(pendingBlockTasks);
if (blockReplyPipeline) {
await blockReplyPipeline.flush({ force: true });
blockReplyPipeline.stop();
}
if (pendingToolTasks.size > 0) {
await Promise.allSettled(pendingToolTasks);
@@ -736,7 +667,9 @@ export async function runReplyAgent(params: {
// Drop final payloads only when block streaming succeeded end-to-end.
// If streaming aborted (e.g., timeout), fall back to final payloads.
const shouldDropFinalPayloads =
blockStreamingEnabled && didStreamBlockReply && !blockReplyAborted;
blockStreamingEnabled &&
Boolean(blockReplyPipeline?.didStream()) &&
!blockReplyPipeline?.isAborted();
const messagingToolSentTexts = runResult.messagingToolSentTexts ?? [];
const messagingToolSentTargets = runResult.messagingToolSentTargets ?? [];
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
@@ -753,7 +686,7 @@ export async function runReplyAgent(params: {
? []
: blockStreamingEnabled
? dedupedPayloads.filter(
(payload) => !streamedPayloadKeys.has(buildPayloadKey(payload)),
(payload) => !blockReplyPipeline?.hasSentPayload(payload),
)
: dedupedPayloads;
const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads;
@@ -886,6 +819,7 @@ export async function runReplyAgent(params: {
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
);
} finally {
blockReplyPipeline?.stop();
typing.markRunComplete();
}
}