🤖 codex: fix block reply ordering (#503)
What: serialize block reply sends, make typing non-blocking, add timeout fallback + abort-aware routing, and add regression tests. Why: prevent out-of-order streamed blocks while keeping final fallback on timeouts. Tests: ./node_modules/.bin/vitest run src/auto-reply/reply.block-streaming.test.ts src/auto-reply/reply/route-reply.test.ts Tests: corepack pnpm lint && corepack pnpm build (pass). corepack pnpm test (ran locally; failure observed during run). Co-authored-by: Josh Palmer <joshp123@users.noreply.github.com>
This commit is contained in:
@@ -47,6 +47,7 @@ import type { TypingController } from "./typing.js";
|
||||
import { createTypingSignaler } from "./typing-mode.js";
|
||||
|
||||
const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i;
|
||||
const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000;
|
||||
|
||||
const isBunFetchSocketError = (message?: string) =>
|
||||
Boolean(message && BUN_FETCH_SOCKET_ERROR_RE.test(message));
|
||||
@@ -61,6 +62,23 @@ const formatBunFetchSocketError = (message: string) => {
|
||||
].join("\n");
|
||||
};
|
||||
|
||||
const withTimeout = async <T>(
|
||||
promise: Promise<T>,
|
||||
timeoutMs: number,
|
||||
timeoutError: Error,
|
||||
): Promise<T> => {
|
||||
if (!timeoutMs || timeoutMs <= 0) return promise;
|
||||
let timer: NodeJS.Timeout | undefined;
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(() => reject(timeoutError), timeoutMs);
|
||||
});
|
||||
try {
|
||||
return await Promise.race([promise, timeoutPromise]);
|
||||
} finally {
|
||||
if (timer) clearTimeout(timer);
|
||||
}
|
||||
};
|
||||
|
||||
export async function runReplyAgent(params: {
|
||||
commandBody: string;
|
||||
followupRun: FollowupRun;
|
||||
@@ -144,7 +162,12 @@ export async function runReplyAgent(params: {
|
||||
const pendingStreamedPayloadKeys = new Set<string>();
|
||||
const pendingBlockTasks = new Set<Promise<void>>();
|
||||
const pendingToolTasks = new Set<Promise<void>>();
|
||||
let blockReplyChain: Promise<void> = Promise.resolve();
|
||||
let blockReplyAborted = false;
|
||||
let didLogBlockReplyAbort = false;
|
||||
let didStreamBlockReply = false;
|
||||
const blockReplyTimeoutMs =
|
||||
opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
|
||||
const buildPayloadKey = (payload: ReplyPayload) => {
|
||||
const text = payload.text?.trim() ?? "";
|
||||
const mediaList = payload.mediaUrls?.length
|
||||
@@ -367,16 +390,49 @@ export async function runReplyAgent(params: {
|
||||
) {
|
||||
return;
|
||||
}
|
||||
if (blockReplyAborted) return;
|
||||
pendingStreamedPayloadKeys.add(payloadKey);
|
||||
const task = (async () => {
|
||||
await typingSignals.signalTextDelta(taggedPayload.text);
|
||||
await opts.onBlockReply?.(blockPayload);
|
||||
})()
|
||||
.then(() => {
|
||||
void typingSignals
|
||||
.signalTextDelta(taggedPayload.text)
|
||||
.catch((err) => {
|
||||
logVerbose(
|
||||
`block reply typing signal failed: ${String(err)}`,
|
||||
);
|
||||
});
|
||||
const timeoutError = new Error(
|
||||
`block reply delivery timed out after ${blockReplyTimeoutMs}ms`,
|
||||
);
|
||||
const abortController = new AbortController();
|
||||
blockReplyChain = blockReplyChain
|
||||
.then(async () => {
|
||||
if (blockReplyAborted) return false;
|
||||
await withTimeout(
|
||||
opts.onBlockReply?.(blockPayload, {
|
||||
abortSignal: abortController.signal,
|
||||
timeoutMs: blockReplyTimeoutMs,
|
||||
}) ?? Promise.resolve(),
|
||||
blockReplyTimeoutMs,
|
||||
timeoutError,
|
||||
);
|
||||
return true;
|
||||
})
|
||||
.then((didSend) => {
|
||||
if (!didSend) return;
|
||||
streamedPayloadKeys.add(payloadKey);
|
||||
didStreamBlockReply = true;
|
||||
})
|
||||
.catch((err) => {
|
||||
if (err === timeoutError) {
|
||||
abortController.abort();
|
||||
blockReplyAborted = true;
|
||||
if (!didLogBlockReplyAbort) {
|
||||
didLogBlockReplyAbort = true;
|
||||
logVerbose(
|
||||
`block reply delivery timed out after ${blockReplyTimeoutMs}ms; skipping remaining block replies to preserve ordering`,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
logVerbose(
|
||||
`block reply delivery failed: ${String(err)}`,
|
||||
);
|
||||
@@ -384,6 +440,7 @@ export async function runReplyAgent(params: {
|
||||
.finally(() => {
|
||||
pendingStreamedPayloadKeys.delete(payloadKey);
|
||||
});
|
||||
const task = blockReplyChain;
|
||||
pendingBlockTasks.add(task);
|
||||
void task.finally(() => pendingBlockTasks.delete(task));
|
||||
}
|
||||
@@ -546,10 +603,10 @@ export async function runReplyAgent(params: {
|
||||
})
|
||||
.filter(isRenderablePayload);
|
||||
|
||||
// Drop final payloads if block streaming is enabled and we already streamed
|
||||
// block replies. Tool-sent duplicates are filtered below.
|
||||
// Drop final payloads only when block streaming succeeded end-to-end.
|
||||
// If streaming aborted (e.g., timeout), fall back to final payloads.
|
||||
const shouldDropFinalPayloads =
|
||||
blockStreamingEnabled && didStreamBlockReply;
|
||||
blockStreamingEnabled && didStreamBlockReply && !blockReplyAborted;
|
||||
const messagingToolSentTexts = runResult.messagingToolSentTexts ?? [];
|
||||
const messagingToolSentTargets = runResult.messagingToolSentTargets ?? [];
|
||||
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
|
||||
|
||||
Reference in New Issue
Block a user