From cc94db458c254a0dbe753c2031715f2863263464 Mon Sep 17 00:00:00 2001 From: Josh Palmer Date: Thu, 8 Jan 2026 19:30:24 +0100 Subject: [PATCH 1/3] =?UTF-8?q?=F0=9F=A4=96=20codex:=20fix=20block=20reply?= =?UTF-8?q?=20ordering=20(#503)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit What: serialize block reply sends, make typing non-blocking, add timeout fallback + abort-aware routing, and add regression tests. Why: prevent out-of-order streamed blocks while keeping final fallback on timeouts. Tests: ./node_modules/.bin/vitest run src/auto-reply/reply.block-streaming.test.ts src/auto-reply/reply/route-reply.test.ts Tests: corepack pnpm lint && corepack pnpm build (pass). corepack pnpm test (ran locally; failure observed during run). Co-authored-by: Josh Palmer --- CHANGELOG.md | 1 + src/auto-reply/reply.block-streaming.test.ts | 110 +++++++++++++++++++ src/auto-reply/reply/agent-runner.ts | 73 ++++++++++-- src/auto-reply/reply/dispatch-from-config.ts | 13 ++- src/auto-reply/reply/route-reply.test.ts | 16 +++ src/auto-reply/reply/route-reply.ts | 13 ++- src/auto-reply/types.ts | 12 +- 7 files changed, 224 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e887663b0..d7af201c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Unreleased +- Auto-reply: preserve block reply ordering with timeout fallback for streaming. (#503) — thanks @joshp123 - WhatsApp: group `/model list` output by provider for scannability. (#456) - thanks @mcinteerj - Hooks: allow per-hook model overrides for webhook/Gmail runs (e.g. GPT 5 Mini). - Control UI: logs tab opens at the newest entries (bottom). diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index caeedc120..15dca24e1 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -103,6 +103,61 @@ describe("block streaming", () => { }); }); + it("preserves block reply ordering when typing start is slow", async () => { + await withTempHome(async (home) => { + let releaseTyping: (() => void) | undefined; + const typingGate = new Promise((resolve) => { + releaseTyping = resolve; + }); + const onReplyStart = vi.fn(() => typingGate); + const seen: string[] = []; + const onBlockReply = vi.fn(async (payload) => { + seen.push(payload.text ?? ""); + }); + + vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + void params.onBlockReply?.({ text: "first" }); + void params.onBlockReply?.({ text: "second" }); + return { + payloads: [{ text: "first" }, { text: "second" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }; + }); + + const replyPromise = getReplyFromConfig( + { + Body: "ping", + From: "+1004", + To: "+2000", + MessageSid: "msg-125", + Provider: "telegram", + }, + { + onReplyStart, + onBlockReply, + }, + { + agent: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + telegram: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + await waitForCalls(() => onReplyStart.mock.calls.length, 1); + releaseTyping?.(); + + const res = await replyPromise; + expect(res).toBeUndefined(); + expect(seen).toEqual(["first", "second"]); + }); + }); + it("drops final payloads when block replies streamed", async () => { await withTempHome(async (home) => { const onBlockReply = vi.fn().mockResolvedValue(undefined); @@ -143,4 +198,59 @@ describe("block streaming", () => { expect(onBlockReply).toHaveBeenCalledTimes(1); }); }); + + it("falls back to final payloads when block reply send times out", async () => { + await withTempHome(async (home) => { + let sawAbort = false; + const onBlockReply = vi.fn((_, context) => { + return new Promise((resolve) => { + context?.abortSignal?.addEventListener( + "abort", + () => { + sawAbort = true; + resolve(); + }, + { once: true }, + ); + }); + }); + + vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + void params.onBlockReply?.({ text: "streamed" }); + return { + payloads: [{ text: "final" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }; + }); + + const replyPromise = getReplyFromConfig( + { + Body: "ping", + From: "+1004", + To: "+2000", + MessageSid: "msg-126", + Provider: "telegram", + }, + { + onBlockReply, + blockReplyTimeoutMs: 10, + }, + { + agent: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + telegram: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + const res = await replyPromise; + expect(res).toMatchObject({ text: "final" }); + expect(sawAbort).toBe(true); + }); + }); }); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 76ba0d040..1225491b2 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -47,6 +47,7 @@ import type { TypingController } from "./typing.js"; import { createTypingSignaler } from "./typing-mode.js"; const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i; +const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000; const isBunFetchSocketError = (message?: string) => Boolean(message && BUN_FETCH_SOCKET_ERROR_RE.test(message)); @@ -61,6 +62,23 @@ const formatBunFetchSocketError = (message: string) => { ].join("\n"); }; +const withTimeout = async ( + promise: Promise, + timeoutMs: number, + timeoutError: Error, +): Promise => { + if (!timeoutMs || timeoutMs <= 0) return promise; + let timer: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, reject) => { + timer = setTimeout(() => reject(timeoutError), timeoutMs); + }); + try { + return await Promise.race([promise, timeoutPromise]); + } finally { + if (timer) clearTimeout(timer); + } +}; + export async function runReplyAgent(params: { commandBody: string; followupRun: FollowupRun; @@ -144,7 +162,12 @@ export async function runReplyAgent(params: { const pendingStreamedPayloadKeys = new Set(); const pendingBlockTasks = new Set>(); const pendingToolTasks = new Set>(); + let blockReplyChain: Promise = Promise.resolve(); + let blockReplyAborted = false; + let didLogBlockReplyAbort = false; let didStreamBlockReply = false; + const blockReplyTimeoutMs = + opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS; const buildPayloadKey = (payload: ReplyPayload) => { const text = payload.text?.trim() ?? ""; const mediaList = payload.mediaUrls?.length @@ -367,16 +390,49 @@ export async function runReplyAgent(params: { ) { return; } + if (blockReplyAborted) return; pendingStreamedPayloadKeys.add(payloadKey); - const task = (async () => { - await typingSignals.signalTextDelta(taggedPayload.text); - await opts.onBlockReply?.(blockPayload); - })() - .then(() => { + void typingSignals + .signalTextDelta(taggedPayload.text) + .catch((err) => { + logVerbose( + `block reply typing signal failed: ${String(err)}`, + ); + }); + const timeoutError = new Error( + `block reply delivery timed out after ${blockReplyTimeoutMs}ms`, + ); + const abortController = new AbortController(); + blockReplyChain = blockReplyChain + .then(async () => { + if (blockReplyAborted) return false; + await withTimeout( + opts.onBlockReply?.(blockPayload, { + abortSignal: abortController.signal, + timeoutMs: blockReplyTimeoutMs, + }) ?? Promise.resolve(), + blockReplyTimeoutMs, + timeoutError, + ); + return true; + }) + .then((didSend) => { + if (!didSend) return; streamedPayloadKeys.add(payloadKey); didStreamBlockReply = true; }) .catch((err) => { + if (err === timeoutError) { + abortController.abort(); + blockReplyAborted = true; + if (!didLogBlockReplyAbort) { + didLogBlockReplyAbort = true; + logVerbose( + `block reply delivery timed out after ${blockReplyTimeoutMs}ms; skipping remaining block replies to preserve ordering`, + ); + } + return; + } logVerbose( `block reply delivery failed: ${String(err)}`, ); @@ -384,6 +440,7 @@ export async function runReplyAgent(params: { .finally(() => { pendingStreamedPayloadKeys.delete(payloadKey); }); + const task = blockReplyChain; pendingBlockTasks.add(task); void task.finally(() => pendingBlockTasks.delete(task)); } @@ -546,10 +603,10 @@ export async function runReplyAgent(params: { }) .filter(isRenderablePayload); - // Drop final payloads if block streaming is enabled and we already streamed - // block replies. Tool-sent duplicates are filtered below. + // Drop final payloads only when block streaming succeeded end-to-end. + // If streaming aborted (e.g., timeout), fall back to final payloads. const shouldDropFinalPayloads = - blockStreamingEnabled && didStreamBlockReply; + blockStreamingEnabled && didStreamBlockReply && !blockReplyAborted; const messagingToolSentTexts = runResult.messagingToolSentTexts ?? []; const messagingToolSentTargets = runResult.messagingToolSentTargets ?? []; const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 26961ad7c..1bf1fbfce 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -41,10 +41,14 @@ export async function dispatchReplyFromConfig(params: { * Note: Only called when shouldRouteToOriginating is true, so * originatingChannel and originatingTo are guaranteed to be defined. */ - const sendPayloadAsync = async (payload: ReplyPayload): Promise => { + const sendPayloadAsync = async ( + payload: ReplyPayload, + abortSignal?: AbortSignal, + ): Promise => { // TypeScript doesn't narrow these from the shouldRouteToOriginating check, // but they're guaranteed non-null when this function is called. if (!originatingChannel || !originatingTo) return; + if (abortSignal?.aborted) return; const result = await routeReply({ payload, channel: originatingChannel, @@ -52,6 +56,7 @@ export async function dispatchReplyFromConfig(params: { accountId: ctx.AccountId, threadId: ctx.MessageThreadId, cfg, + abortSignal, }); if (!result.ok) { logVerbose( @@ -73,10 +78,10 @@ export async function dispatchReplyFromConfig(params: { dispatcher.sendToolResult(payload); } }, - onBlockReply: (payload: ReplyPayload) => { + onBlockReply: (payload: ReplyPayload, context) => { if (shouldRouteToOriginating) { - // Fire-and-forget for streaming block replies when routing. - void sendPayloadAsync(payload); + // Await routed sends so upstream can enforce ordering/timeouts. + return sendPayloadAsync(payload, context?.abortSignal); } else { // Synchronous dispatch to preserve callback timing. dispatcher.sendBlockReply(payload); diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index cc40383c9..9571e292f 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -31,6 +31,22 @@ vi.mock("../../web/outbound.js", () => ({ const { routeReply } = await import("./route-reply.js"); describe("routeReply", () => { + it("skips sends when abort signal is already aborted", async () => { + mocks.sendMessageSlack.mockClear(); + const controller = new AbortController(); + controller.abort(); + const res = await routeReply({ + payload: { text: "hi" }, + channel: "slack", + to: "channel:C123", + cfg: {} as never, + abortSignal: controller.signal, + }); + expect(res.ok).toBe(false); + expect(res.error).toContain("aborted"); + expect(mocks.sendMessageSlack).not.toHaveBeenCalled(); + }); + it("no-ops on empty payload", async () => { mocks.sendMessageSlack.mockClear(); const res = await routeReply({ diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index c02ce1c77..f7529c8cf 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -30,6 +30,8 @@ export type RouteReplyParams = { threadId?: number; /** Config for provider-specific settings. */ cfg: ClawdbotConfig; + /** Optional abort signal for cooperative cancellation. */ + abortSignal?: AbortSignal; }; export type RouteReplyResult = { @@ -52,7 +54,7 @@ export type RouteReplyResult = { export async function routeReply( params: RouteReplyParams, ): Promise { - const { payload, channel, to, accountId, threadId } = params; + const { payload, channel, to, accountId, threadId, abortSignal } = params; // Debug: `pnpm test src/auto-reply/reply/route-reply.test.ts` const text = payload.text ?? ""; @@ -72,6 +74,9 @@ export async function routeReply( text: string; mediaUrl?: string; }): Promise => { + if (abortSignal?.aborted) { + return { ok: false, error: "Reply routing aborted" }; + } const { text, mediaUrl } = params; switch (channel) { case "telegram": { @@ -148,12 +153,18 @@ export async function routeReply( }; try { + if (abortSignal?.aborted) { + return { ok: false, error: "Reply routing aborted" }; + } if (mediaUrls.length === 0) { return await sendOne({ text }); } let last: RouteReplyResult | undefined; for (let i = 0; i < mediaUrls.length; i++) { + if (abortSignal?.aborted) { + return { ok: false, error: "Reply routing aborted" }; + } const mediaUrl = mediaUrls[i]; const caption = i === 0 ? text : ""; last = await sendOne({ text: caption, mediaUrl }); diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 0bd7671d7..7f69aeff9 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -1,14 +1,24 @@ import type { TypingController } from "./reply/typing.js"; +export type BlockReplyContext = { + abortSignal?: AbortSignal; + timeoutMs?: number; +}; + export type GetReplyOptions = { onReplyStart?: () => Promise | void; onTypingController?: (typing: TypingController) => void; isHeartbeat?: boolean; onPartialReply?: (payload: ReplyPayload) => Promise | void; onReasoningStream?: (payload: ReplyPayload) => Promise | void; - onBlockReply?: (payload: ReplyPayload) => Promise | void; + onBlockReply?: ( + payload: ReplyPayload, + context?: BlockReplyContext, + ) => Promise | void; onToolResult?: (payload: ReplyPayload) => Promise | void; disableBlockStreaming?: boolean; + /** Timeout for block reply delivery (ms). */ + blockReplyTimeoutMs?: number; /** If provided, only load these skills for this session (empty = no skills). */ skillFilter?: string[]; }; From b7c900739e28dc85f87ae9d671c4121ce831766c Mon Sep 17 00:00:00 2001 From: Josh Palmer Date: Thu, 8 Jan 2026 19:59:06 +0100 Subject: [PATCH 2/3] =?UTF-8?q?=F0=9F=A4=96=20codex:=20handle=20discord=20?= =?UTF-8?q?gateway=20error=20events=20(#504)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 1 + src/discord/monitor.gateway.test.ts | 49 +++++++++++++++++ src/discord/monitor.ts | 84 ++++++++++++++++++++++++----- 3 files changed, 122 insertions(+), 12 deletions(-) create mode 100644 src/discord/monitor.gateway.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index d7af201c1..324bd54e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Unreleased +- Discord: stop provider when gateway reconnects are exhausted and surface errors. (#504) - Auto-reply: preserve block reply ordering with timeout fallback for streaming. (#503) — thanks @joshp123 - WhatsApp: group `/model list` output by provider for scannability. (#456) - thanks @mcinteerj - Hooks: allow per-hook model overrides for webhook/Gmail runs (e.g. GPT 5 Mini). diff --git a/src/discord/monitor.gateway.test.ts b/src/discord/monitor.gateway.test.ts new file mode 100644 index 000000000..fc7d62784 --- /dev/null +++ b/src/discord/monitor.gateway.test.ts @@ -0,0 +1,49 @@ +import { EventEmitter } from "node:events"; + +import { describe, expect, it, vi } from "vitest"; + +import { waitForDiscordGatewayStop } from "./monitor.js"; + +describe("waitForDiscordGatewayStop", () => { + it("resolves on abort and disconnects gateway", async () => { + const emitter = new EventEmitter(); + const disconnect = vi.fn(); + const abort = new AbortController(); + + const promise = waitForDiscordGatewayStop({ + gateway: { emitter, disconnect }, + abortSignal: abort.signal, + }); + + expect(emitter.listenerCount("error")).toBe(1); + abort.abort(); + + await expect(promise).resolves.toBeUndefined(); + expect(disconnect).toHaveBeenCalledTimes(1); + expect(emitter.listenerCount("error")).toBe(0); + }); + + it("rejects on gateway error and disconnects", async () => { + const emitter = new EventEmitter(); + const disconnect = vi.fn(); + const onGatewayError = vi.fn(); + const abort = new AbortController(); + const err = new Error("boom"); + + const promise = waitForDiscordGatewayStop({ + gateway: { emitter, disconnect }, + abortSignal: abort.signal, + onGatewayError, + }); + + emitter.emit("error", err); + + await expect(promise).rejects.toThrow("boom"); + expect(onGatewayError).toHaveBeenCalledWith(err); + expect(disconnect).toHaveBeenCalledTimes(1); + expect(emitter.listenerCount("error")).toBe(0); + + abort.abort(); + expect(disconnect).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 27baa3c66..5d845fe2e 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -1,3 +1,5 @@ +import type { EventEmitter } from "node:events"; + import { ChannelType, Client, @@ -82,6 +84,62 @@ type DiscordMediaInfo = { placeholder: string; }; +type DiscordGatewayHandle = { + emitter?: Pick; + disconnect?: () => void; +}; + +export async function waitForDiscordGatewayStop(params: { + gateway?: DiscordGatewayHandle; + abortSignal?: AbortSignal; + onGatewayError?: (err: unknown) => void; +}): Promise { + const { gateway, abortSignal, onGatewayError } = params; + const emitter = gateway?.emitter; + return await new Promise((resolve, reject) => { + let settled = false; + const cleanup = () => { + abortSignal?.removeEventListener("abort", onAbort); + emitter?.removeListener("error", onGatewayErrorEvent); + }; + const finishResolve = () => { + if (settled) return; + settled = true; + cleanup(); + try { + gateway?.disconnect?.(); + } finally { + resolve(); + } + }; + const finishReject = (err: unknown) => { + if (settled) return; + settled = true; + cleanup(); + try { + gateway?.disconnect?.(); + } finally { + reject(err); + } + }; + const onAbort = () => { + finishResolve(); + }; + const onGatewayErrorEvent = (err: unknown) => { + onGatewayError?.(err); + finishReject(err); + }; + + if (abortSignal?.aborted) { + onAbort(); + return; + } + + abortSignal?.addEventListener("abort", onAbort, { once: true }); + emitter?.on("error", onGatewayErrorEvent); + }); +} + type DiscordHistoryEntry = { sender: string; body: string; @@ -402,18 +460,20 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`); - await new Promise((resolve) => { - const onAbort = async () => { - try { - const gateway = client.getPlugin("gateway"); - gateway?.disconnect(); - } finally { - resolve(); - } - }; - opts.abortSignal?.addEventListener("abort", () => { - void onAbort(); - }); + const gateway = client.getPlugin("gateway"); + const gatewayEmitter = (gateway as unknown as { emitter?: EventEmitter }) + ?.emitter; + await waitForDiscordGatewayStop({ + gateway: gateway + ? { + emitter: gatewayEmitter, + disconnect: () => gateway.disconnect(), + } + : undefined, + abortSignal: opts.abortSignal, + onGatewayError: (err) => { + runtime.error?.(danger(`discord gateway error: ${String(err)}`)); + }, }); } From c54f2a122a335f490436b5f590d5559944aecbd0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 8 Jan 2026 20:08:27 +0100 Subject: [PATCH 3/3] fix: update changelog + prompt test --- CHANGELOG.md | 2 +- src/agents/system-prompt.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 324bd54e3..cd232f2e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## Unreleased -- Discord: stop provider when gateway reconnects are exhausted and surface errors. (#504) +- Discord: stop provider when gateway reconnects are exhausted and surface errors. (#514) — thanks @joshp123 - Auto-reply: preserve block reply ordering with timeout fallback for streaming. (#503) — thanks @joshp123 - WhatsApp: group `/model list` output by provider for scannability. (#456) - thanks @mcinteerj - Hooks: allow per-hook model overrides for webhook/Gmail runs (e.g. GPT 5 Mini). diff --git a/src/agents/system-prompt.test.ts b/src/agents/system-prompt.test.ts index 4c7fc1f58..17e38efc8 100644 --- a/src/agents/system-prompt.test.ts +++ b/src/agents/system-prompt.test.ts @@ -78,7 +78,7 @@ describe("buildAgentSystemPrompt", () => { toolNames: ["gateway", "bash"], }); - expect(prompt).toContain("## ClaudeBot Self-Update"); + expect(prompt).toContain("## Clawdbot Self-Update"); expect(prompt).toContain("config.apply"); expect(prompt).toContain("update.run"); });