diff --git a/CHANGELOG.md b/CHANGELOG.md index e887663b0..08751d4a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,18 @@ ## Unreleased +- Heartbeat: resolve Telegram account IDs from config-only tokens; cron tool accepts canonical `jobId` and legacy `id` for job actions. (#516) — thanks @YuriNachos +- Discord: stop provider when gateway reconnects are exhausted and surface errors. (#514) — thanks @joshp123 +- Auto-reply: preserve block reply ordering with timeout fallback for streaming. (#503) — thanks @joshp123 +- Auto-reply: avoid splitting outbound chunks inside parentheses. (#499) — thanks @philipp-spiess +- Status: show provider prefix in /status model display. (#506) — thanks @mcinteerj +- macOS: package ClawdbotKit resources and Swift 6.2 compatibility dylib to avoid launch/tool crashes. (#473) — thanks @gupsammy - WhatsApp: group `/model list` output by provider for scannability. (#456) - thanks @mcinteerj - Hooks: allow per-hook model overrides for webhook/Gmail runs (e.g. GPT 5 Mini). - Control UI: logs tab opens at the newest entries (bottom). - Control UI: add Docs link, remove chat composer divider, and add New session button. - Telegram: retry long-polling conflicts with backoff to avoid fatal exits. +- Telegram: fix grammY fetch type mismatch when injecting `fetch`. (#512) — thanks @YuriNachos - Agent system prompt: avoid automatic self-updates unless explicitly requested. - Onboarding: tighten QuickStart hint copy for configuring later. - Onboarding: avoid “token expired” for Codex CLI when expiry is heuristic. diff --git a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/ClawdbotKitResources.swift b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/ClawdbotKitResources.swift index 8e5ee5f63..d990e2cb2 100644 --- a/apps/shared/ClawdbotKit/Sources/ClawdbotKit/ClawdbotKitResources.swift +++ b/apps/shared/ClawdbotKit/Sources/ClawdbotKit/ClawdbotKitResources.swift @@ -1,5 +1,75 @@ import Foundation public enum ClawdbotKitResources { - public static let bundle: Bundle = .module + /// Resource bundle for ClawdbotKit. + /// + /// Locates the SwiftPM-generated resource bundle, checking multiple locations: + /// 1. Inside Bundle.main (packaged apps) + /// 2. Bundle.module (SwiftPM development/tests) + /// 3. Falls back to Bundle.main if not found (resource lookups will return nil) + /// + /// This avoids a fatal crash when Bundle.module can't locate its resources + /// in packaged .app bundles where the resource bundle path differs from + /// SwiftPM's expectations. + public static let bundle: Bundle = locateBundle() + + private static let bundleName = "ClawdbotKit_ClawdbotKit" + + private static func locateBundle() -> Bundle { + // 1. Check inside Bundle.main (packaged apps copy resources here) + if let mainResourceURL = Bundle.main.resourceURL { + let bundleURL = mainResourceURL.appendingPathComponent("\(bundleName).bundle") + if let bundle = Bundle(url: bundleURL) { + return bundle + } + } + + // 2. Check Bundle.main directly for embedded resources + if Bundle.main.url(forResource: "tool-display", withExtension: "json") != nil { + return Bundle.main + } + + // 3. Try Bundle.module (works in SwiftPM development/tests) + // Wrap in a function to defer the fatalError until actually called + if let moduleBundle = loadModuleBundleSafely() { + return moduleBundle + } + + // 4. Fallback: return Bundle.main (resource lookups will return nil gracefully) + return Bundle.main + } + + private static func loadModuleBundleSafely() -> Bundle? { + // Bundle.module is generated by SwiftPM and will fatalError if not found. + // We check likely locations manually to avoid the crash. + let candidates: [URL?] = [ + Bundle.main.resourceURL, + Bundle.main.bundleURL, + Bundle(for: BundleLocator.self).resourceURL, + Bundle(for: BundleLocator.self).bundleURL, + ] + + for candidate in candidates { + guard let baseURL = candidate else { continue } + + // Direct path + let directURL = baseURL.appendingPathComponent("\(bundleName).bundle") + if let bundle = Bundle(url: directURL) { + return bundle + } + + // Inside Resources/ + let resourcesURL = baseURL + .appendingPathComponent("Resources") + .appendingPathComponent("\(bundleName).bundle") + if let bundle = Bundle(url: resourcesURL) { + return bundle + } + } + + return nil + } } + +// Helper class for bundle lookup via Bundle(for:) +private final class BundleLocator {} diff --git a/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/ToolDisplayRegistryTests.swift b/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/ToolDisplayRegistryTests.swift new file mode 100644 index 000000000..54e68fe00 --- /dev/null +++ b/apps/shared/ClawdbotKit/Tests/ClawdbotKitTests/ToolDisplayRegistryTests.swift @@ -0,0 +1,16 @@ +import ClawdbotKit +import Foundation +import Testing + +@Suite struct ToolDisplayRegistryTests { + @Test func loadsToolDisplayConfigFromBundle() { + let url = ClawdbotKitResources.bundle.url(forResource: "tool-display", withExtension: "json") + #expect(url != nil) + } + + @Test func resolvesKnownToolFromConfig() { + let summary = ToolDisplayRegistry.resolve(name: "bash", args: nil) + #expect(summary.emoji == "🛠️") + #expect(summary.title == "Bash") + } +} diff --git a/scripts/package-mac-app.sh b/scripts/package-mac-app.sh index aed72a4f1..b5467f60b 100755 --- a/scripts/package-mac-app.sh +++ b/scripts/package-mac-app.sh @@ -221,6 +221,15 @@ if [ -d "$SPARKLE_FRAMEWORK_PRIMARY" ]; then chmod -R a+rX "$APP_ROOT/Contents/Frameworks/Sparkle.framework" fi +echo "📦 Copying Swift 6.2 compatibility libraries" +SWIFT_COMPAT_LIB="$(xcode-select -p)/Toolchains/XcodeDefault.xctoolchain/usr/lib/swift-6.2/macosx/libswiftCompatibilitySpan.dylib" +if [ -f "$SWIFT_COMPAT_LIB" ]; then + cp "$SWIFT_COMPAT_LIB" "$APP_ROOT/Contents/Frameworks/" + chmod +x "$APP_ROOT/Contents/Frameworks/libswiftCompatibilitySpan.dylib" +else + echo "WARN: Swift compatibility library not found at $SWIFT_COMPAT_LIB (continuing)" >&2 +fi + echo "🖼 Copying app icon" cp "$ROOT_DIR/apps/macos/Sources/Clawdbot/Resources/Clawdbot.icns" "$APP_ROOT/Contents/Resources/Clawdbot.icns" @@ -228,6 +237,15 @@ echo "📦 Copying device model resources" rm -rf "$APP_ROOT/Contents/Resources/DeviceModels" cp -R "$ROOT_DIR/apps/macos/Sources/Clawdbot/Resources/DeviceModels" "$APP_ROOT/Contents/Resources/DeviceModels" +echo "📦 Copying ClawdbotKit resources" +CLAWDBOTKIT_BUNDLE="$(build_path_for_arch "$PRIMARY_ARCH")/$BUILD_CONFIG/ClawdbotKit_ClawdbotKit.bundle" +if [ -d "$CLAWDBOTKIT_BUNDLE" ]; then + rm -rf "$APP_ROOT/Contents/Resources/ClawdbotKit_ClawdbotKit.bundle" + cp -R "$CLAWDBOTKIT_BUNDLE" "$APP_ROOT/Contents/Resources/ClawdbotKit_ClawdbotKit.bundle" +else + echo "WARN: ClawdbotKit resource bundle not found at $CLAWDBOTKIT_BUNDLE (continuing)" >&2 +fi + RELAY_DIR="$APP_ROOT/Contents/Resources/Relay" if [[ "${SKIP_GATEWAY_PACKAGE:-0}" != "1" ]]; then diff --git a/src/agents/system-prompt.test.ts b/src/agents/system-prompt.test.ts index 4c7fc1f58..17e38efc8 100644 --- a/src/agents/system-prompt.test.ts +++ b/src/agents/system-prompt.test.ts @@ -78,7 +78,7 @@ describe("buildAgentSystemPrompt", () => { toolNames: ["gateway", "bash"], }); - expect(prompt).toContain("## ClaudeBot Self-Update"); + expect(prompt).toContain("## Clawdbot Self-Update"); expect(prompt).toContain("config.apply"); expect(prompt).toContain("update.run"); }); diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index 8becd0f33..6e65acb83 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -19,9 +19,17 @@ describe("cron tool", () => { { action: "update", jobId: "job-1", patch: { foo: "bar" } }, { id: "job-1", patch: { foo: "bar" } }, ], + [ + "update", + { action: "update", id: "job-2", patch: { foo: "bar" } }, + { id: "job-2", patch: { foo: "bar" } }, + ], ["remove", { action: "remove", jobId: "job-1" }, { id: "job-1" }], + ["remove", { action: "remove", id: "job-2" }, { id: "job-2" }], ["run", { action: "run", jobId: "job-1" }, { id: "job-1" }], + ["run", { action: "run", id: "job-2" }, { id: "job-2" }], ["runs", { action: "runs", jobId: "job-1" }, { id: "job-1" }], + ["runs", { action: "runs", id: "job-2" }, { id: "job-2" }], ])("%s sends id to gateway", async (action, args, expectedParams) => { const tool = createCronTool(); await tool.execute("call1", args); @@ -35,6 +43,20 @@ describe("cron tool", () => { expect(call.params).toEqual(expectedParams); }); + it("prefers jobId over id when both are provided", async () => { + const tool = createCronTool(); + await tool.execute("call1", { + action: "run", + jobId: "job-primary", + id: "job-legacy", + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: unknown; + }; + expect(call?.params).toEqual({ id: "job-primary" }); + }); + it("normalizes cron.add job payloads", async () => { const tool = createCronTool(); await tool.execute("call2", { diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index c070f2864..519b707cd 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -47,7 +47,8 @@ const CronToolSchema = Type.Union([ gatewayUrl: Type.Optional(Type.String()), gatewayToken: Type.Optional(Type.String()), timeoutMs: Type.Optional(Type.Number()), - jobId: Type.String(), + jobId: Type.Optional(Type.String()), + id: Type.Optional(Type.String()), patch: Type.Object({}, { additionalProperties: true }), }), Type.Object({ @@ -55,21 +56,24 @@ const CronToolSchema = Type.Union([ gatewayUrl: Type.Optional(Type.String()), gatewayToken: Type.Optional(Type.String()), timeoutMs: Type.Optional(Type.Number()), - jobId: Type.String(), + jobId: Type.Optional(Type.String()), + id: Type.Optional(Type.String()), }), Type.Object({ action: Type.Literal("run"), gatewayUrl: Type.Optional(Type.String()), gatewayToken: Type.Optional(Type.String()), timeoutMs: Type.Optional(Type.Number()), - jobId: Type.String(), + jobId: Type.Optional(Type.String()), + id: Type.Optional(Type.String()), }), Type.Object({ action: Type.Literal("runs"), gatewayUrl: Type.Optional(Type.String()), gatewayToken: Type.Optional(Type.String()), timeoutMs: Type.Optional(Type.Number()), - jobId: Type.String(), + jobId: Type.Optional(Type.String()), + id: Type.Optional(Type.String()), }), Type.Object({ action: Type.Literal("wake"), @@ -88,7 +92,7 @@ export function createCronTool(): AnyAgentTool { label: "Cron", name: "cron", description: - "Manage Gateway cron jobs (status/list/add/update/remove/run/runs) and send wake events.", + "Manage Gateway cron jobs (status/list/add/update/remove/run/runs) and send wake events. Use `jobId` as the canonical identifier; `id` is accepted for compatibility.", parameters: CronToolSchema, execute: async (_toolCallId, args) => { const params = args as Record; @@ -121,7 +125,13 @@ export function createCronTool(): AnyAgentTool { ); } case "update": { - const id = readStringParam(params, "jobId", { required: true }); + const id = + readStringParam(params, "jobId") ?? readStringParam(params, "id"); + if (!id) { + throw new Error( + "jobId required (id accepted for backward compatibility)", + ); + } if (!params.patch || typeof params.patch !== "object") { throw new Error("patch required"); } @@ -134,19 +144,37 @@ export function createCronTool(): AnyAgentTool { ); } case "remove": { - const id = readStringParam(params, "jobId", { required: true }); + const id = + readStringParam(params, "jobId") ?? readStringParam(params, "id"); + if (!id) { + throw new Error( + "jobId required (id accepted for backward compatibility)", + ); + } return jsonResult( await callGatewayTool("cron.remove", gatewayOpts, { id }), ); } case "run": { - const id = readStringParam(params, "jobId", { required: true }); + const id = + readStringParam(params, "jobId") ?? readStringParam(params, "id"); + if (!id) { + throw new Error( + "jobId required (id accepted for backward compatibility)", + ); + } return jsonResult( await callGatewayTool("cron.run", gatewayOpts, { id }), ); } case "runs": { - const id = readStringParam(params, "jobId", { required: true }); + const id = + readStringParam(params, "jobId") ?? readStringParam(params, "id"); + if (!id) { + throw new Error( + "jobId required (id accepted for backward compatibility)", + ); + } return jsonResult( await callGatewayTool("cron.runs", gatewayOpts, { id }), ); diff --git a/src/auto-reply/chunk.test.ts b/src/auto-reply/chunk.test.ts index f4462cfc9..335576b3c 100644 --- a/src/auto-reply/chunk.test.ts +++ b/src/auto-reply/chunk.test.ts @@ -67,6 +67,12 @@ describe("chunkText", () => { const chunks = chunkText(text, 10); expect(chunks).toEqual(["Supercalif", "ragilistic", "expialidoc", "ious"]); }); + + it("keeps parenthetical phrases together", () => { + const text = "Heads up now (Though now I'm curious)ok"; + const chunks = chunkText(text, 35); + expect(chunks).toEqual(["Heads up now", "(Though now I'm curious)ok"]); + }); }); describe("resolveTextChunkLimit", () => { @@ -184,4 +190,29 @@ describe("chunkMarkdownText", () => { expect(nonFenceLines.join("\n").trim()).not.toBe(""); } }); + + it("keeps parenthetical phrases together", () => { + const text = "Heads up now (Though now I'm curious)ok"; + const chunks = chunkMarkdownText(text, 35); + expect(chunks).toEqual(["Heads up now", "(Though now I'm curious)ok"]); + }); + + it("handles nested parentheses", () => { + const text = "Hello (outer (inner) end) world"; + const chunks = chunkMarkdownText(text, 26); + expect(chunks).toEqual(["Hello (outer (inner) end)", "world"]); + }); + + it("hard-breaks when a parenthetical exceeds the limit", () => { + const text = `(${"a".repeat(80)})`; + const chunks = chunkMarkdownText(text, 20); + expect(chunks[0]?.length).toBe(20); + expect(chunks.join("")).toBe(text); + }); + + it("ignores unmatched closing parentheses", () => { + const text = "Hello) world (ok)"; + const chunks = chunkMarkdownText(text, 12); + expect(chunks).toEqual(["Hello)", "world (ok)"]); + }); }); diff --git a/src/auto-reply/chunk.ts b/src/auto-reply/chunk.ts index b362eab2f..b64bbe5bf 100644 --- a/src/auto-reply/chunk.ts +++ b/src/auto-reply/chunk.ts @@ -90,18 +90,27 @@ export function chunkText(text: string, limit: number): string[] { while (remaining.length > limit) { const window = remaining.slice(0, limit); - // 1) Prefer a newline break inside the window. - let breakIdx = window.lastIndexOf("\n"); + // 1) Prefer a newline break inside the window (outside parentheses). + let lastNewline = -1; + let lastWhitespace = -1; + let depth = 0; + for (let i = 0; i < window.length; i++) { + const char = window[i]; + if (char === "(") { + depth += 1; + continue; + } + if (char === ")" && depth > 0) { + depth -= 1; + continue; + } + if (depth !== 0) continue; + if (char === "\n") lastNewline = i; + else if (/\s/.test(char)) lastWhitespace = i; + } // 2) Otherwise prefer the last whitespace (word boundary) inside the window. - if (breakIdx <= 0) { - for (let i = window.length - 1; i >= 0; i--) { - if (/\s/.test(window[i])) { - breakIdx = i; - break; - } - } - } + let breakIdx = lastNewline > 0 ? lastNewline : lastWhitespace; // 3) Fallback: hard break exactly at the limit. if (breakIdx <= 0) breakIdx = limit; @@ -234,15 +243,27 @@ function pickSafeBreakIndex( window: string, spans: ReturnType, ): number { - let newlineIdx = window.lastIndexOf("\n"); - while (newlineIdx > 0) { - if (isSafeFenceBreak(spans, newlineIdx)) return newlineIdx; - newlineIdx = window.lastIndexOf("\n", newlineIdx - 1); - } - - for (let i = window.length - 1; i > 0; i--) { - if (/\s/.test(window[i]) && isSafeFenceBreak(spans, i)) return i; + let lastNewline = -1; + let lastWhitespace = -1; + let depth = 0; + + for (let i = 0; i < window.length; i++) { + if (!isSafeFenceBreak(spans, i)) continue; + const char = window[i]; + if (char === "(") { + depth += 1; + continue; + } + if (char === ")" && depth > 0) { + depth -= 1; + continue; + } + if (depth !== 0) continue; + if (char === "\n") lastNewline = i; + else if (/\s/.test(char)) lastWhitespace = i; } + if (lastNewline > 0) return lastNewline; + if (lastWhitespace > 0) return lastWhitespace; return -1; } diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index caeedc120..15dca24e1 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -103,6 +103,61 @@ describe("block streaming", () => { }); }); + it("preserves block reply ordering when typing start is slow", async () => { + await withTempHome(async (home) => { + let releaseTyping: (() => void) | undefined; + const typingGate = new Promise((resolve) => { + releaseTyping = resolve; + }); + const onReplyStart = vi.fn(() => typingGate); + const seen: string[] = []; + const onBlockReply = vi.fn(async (payload) => { + seen.push(payload.text ?? ""); + }); + + vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + void params.onBlockReply?.({ text: "first" }); + void params.onBlockReply?.({ text: "second" }); + return { + payloads: [{ text: "first" }, { text: "second" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }; + }); + + const replyPromise = getReplyFromConfig( + { + Body: "ping", + From: "+1004", + To: "+2000", + MessageSid: "msg-125", + Provider: "telegram", + }, + { + onReplyStart, + onBlockReply, + }, + { + agent: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + telegram: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + await waitForCalls(() => onReplyStart.mock.calls.length, 1); + releaseTyping?.(); + + const res = await replyPromise; + expect(res).toBeUndefined(); + expect(seen).toEqual(["first", "second"]); + }); + }); + it("drops final payloads when block replies streamed", async () => { await withTempHome(async (home) => { const onBlockReply = vi.fn().mockResolvedValue(undefined); @@ -143,4 +198,59 @@ describe("block streaming", () => { expect(onBlockReply).toHaveBeenCalledTimes(1); }); }); + + it("falls back to final payloads when block reply send times out", async () => { + await withTempHome(async (home) => { + let sawAbort = false; + const onBlockReply = vi.fn((_, context) => { + return new Promise((resolve) => { + context?.abortSignal?.addEventListener( + "abort", + () => { + sawAbort = true; + resolve(); + }, + { once: true }, + ); + }); + }); + + vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + void params.onBlockReply?.({ text: "streamed" }); + return { + payloads: [{ text: "final" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }; + }); + + const replyPromise = getReplyFromConfig( + { + Body: "ping", + From: "+1004", + To: "+2000", + MessageSid: "msg-126", + Provider: "telegram", + }, + { + onBlockReply, + blockReplyTimeoutMs: 10, + }, + { + agent: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + telegram: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + const res = await replyPromise; + expect(res).toMatchObject({ text: "final" }); + expect(sawAbort).toBe(true); + }); + }); }); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 76ba0d040..1225491b2 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -47,6 +47,7 @@ import type { TypingController } from "./typing.js"; import { createTypingSignaler } from "./typing-mode.js"; const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i; +const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000; const isBunFetchSocketError = (message?: string) => Boolean(message && BUN_FETCH_SOCKET_ERROR_RE.test(message)); @@ -61,6 +62,23 @@ const formatBunFetchSocketError = (message: string) => { ].join("\n"); }; +const withTimeout = async ( + promise: Promise, + timeoutMs: number, + timeoutError: Error, +): Promise => { + if (!timeoutMs || timeoutMs <= 0) return promise; + let timer: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, reject) => { + timer = setTimeout(() => reject(timeoutError), timeoutMs); + }); + try { + return await Promise.race([promise, timeoutPromise]); + } finally { + if (timer) clearTimeout(timer); + } +}; + export async function runReplyAgent(params: { commandBody: string; followupRun: FollowupRun; @@ -144,7 +162,12 @@ export async function runReplyAgent(params: { const pendingStreamedPayloadKeys = new Set(); const pendingBlockTasks = new Set>(); const pendingToolTasks = new Set>(); + let blockReplyChain: Promise = Promise.resolve(); + let blockReplyAborted = false; + let didLogBlockReplyAbort = false; let didStreamBlockReply = false; + const blockReplyTimeoutMs = + opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS; const buildPayloadKey = (payload: ReplyPayload) => { const text = payload.text?.trim() ?? ""; const mediaList = payload.mediaUrls?.length @@ -367,16 +390,49 @@ export async function runReplyAgent(params: { ) { return; } + if (blockReplyAborted) return; pendingStreamedPayloadKeys.add(payloadKey); - const task = (async () => { - await typingSignals.signalTextDelta(taggedPayload.text); - await opts.onBlockReply?.(blockPayload); - })() - .then(() => { + void typingSignals + .signalTextDelta(taggedPayload.text) + .catch((err) => { + logVerbose( + `block reply typing signal failed: ${String(err)}`, + ); + }); + const timeoutError = new Error( + `block reply delivery timed out after ${blockReplyTimeoutMs}ms`, + ); + const abortController = new AbortController(); + blockReplyChain = blockReplyChain + .then(async () => { + if (blockReplyAborted) return false; + await withTimeout( + opts.onBlockReply?.(blockPayload, { + abortSignal: abortController.signal, + timeoutMs: blockReplyTimeoutMs, + }) ?? Promise.resolve(), + blockReplyTimeoutMs, + timeoutError, + ); + return true; + }) + .then((didSend) => { + if (!didSend) return; streamedPayloadKeys.add(payloadKey); didStreamBlockReply = true; }) .catch((err) => { + if (err === timeoutError) { + abortController.abort(); + blockReplyAborted = true; + if (!didLogBlockReplyAbort) { + didLogBlockReplyAbort = true; + logVerbose( + `block reply delivery timed out after ${blockReplyTimeoutMs}ms; skipping remaining block replies to preserve ordering`, + ); + } + return; + } logVerbose( `block reply delivery failed: ${String(err)}`, ); @@ -384,6 +440,7 @@ export async function runReplyAgent(params: { .finally(() => { pendingStreamedPayloadKeys.delete(payloadKey); }); + const task = blockReplyChain; pendingBlockTasks.add(task); void task.finally(() => pendingBlockTasks.delete(task)); } @@ -546,10 +603,10 @@ export async function runReplyAgent(params: { }) .filter(isRenderablePayload); - // Drop final payloads if block streaming is enabled and we already streamed - // block replies. Tool-sent duplicates are filtered below. + // Drop final payloads only when block streaming succeeded end-to-end. + // If streaming aborted (e.g., timeout), fall back to final payloads. const shouldDropFinalPayloads = - blockStreamingEnabled && didStreamBlockReply; + blockStreamingEnabled && didStreamBlockReply && !blockReplyAborted; const messagingToolSentTexts = runResult.messagingToolSentTexts ?? []; const messagingToolSentTargets = runResult.messagingToolSentTargets ?? []; const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ diff --git a/src/auto-reply/reply/commands.ts b/src/auto-reply/reply/commands.ts index 78bc7010f..5c80076c4 100644 --- a/src/auto-reply/reply/commands.ts +++ b/src/auto-reply/reply/commands.ts @@ -444,7 +444,7 @@ export async function handleCommands(params: { ...cfg.agent, model: { ...cfg.agent?.model, - primary: model, + primary: `${provider}/${model}`, }, contextTokens, thinkingDefault: cfg.agent?.thinkingDefault, diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 26961ad7c..1bf1fbfce 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -41,10 +41,14 @@ export async function dispatchReplyFromConfig(params: { * Note: Only called when shouldRouteToOriginating is true, so * originatingChannel and originatingTo are guaranteed to be defined. */ - const sendPayloadAsync = async (payload: ReplyPayload): Promise => { + const sendPayloadAsync = async ( + payload: ReplyPayload, + abortSignal?: AbortSignal, + ): Promise => { // TypeScript doesn't narrow these from the shouldRouteToOriginating check, // but they're guaranteed non-null when this function is called. if (!originatingChannel || !originatingTo) return; + if (abortSignal?.aborted) return; const result = await routeReply({ payload, channel: originatingChannel, @@ -52,6 +56,7 @@ export async function dispatchReplyFromConfig(params: { accountId: ctx.AccountId, threadId: ctx.MessageThreadId, cfg, + abortSignal, }); if (!result.ok) { logVerbose( @@ -73,10 +78,10 @@ export async function dispatchReplyFromConfig(params: { dispatcher.sendToolResult(payload); } }, - onBlockReply: (payload: ReplyPayload) => { + onBlockReply: (payload: ReplyPayload, context) => { if (shouldRouteToOriginating) { - // Fire-and-forget for streaming block replies when routing. - void sendPayloadAsync(payload); + // Await routed sends so upstream can enforce ordering/timeouts. + return sendPayloadAsync(payload, context?.abortSignal); } else { // Synchronous dispatch to preserve callback timing. dispatcher.sendBlockReply(payload); diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index cc40383c9..9571e292f 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -31,6 +31,22 @@ vi.mock("../../web/outbound.js", () => ({ const { routeReply } = await import("./route-reply.js"); describe("routeReply", () => { + it("skips sends when abort signal is already aborted", async () => { + mocks.sendMessageSlack.mockClear(); + const controller = new AbortController(); + controller.abort(); + const res = await routeReply({ + payload: { text: "hi" }, + channel: "slack", + to: "channel:C123", + cfg: {} as never, + abortSignal: controller.signal, + }); + expect(res.ok).toBe(false); + expect(res.error).toContain("aborted"); + expect(mocks.sendMessageSlack).not.toHaveBeenCalled(); + }); + it("no-ops on empty payload", async () => { mocks.sendMessageSlack.mockClear(); const res = await routeReply({ diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index c02ce1c77..f7529c8cf 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -30,6 +30,8 @@ export type RouteReplyParams = { threadId?: number; /** Config for provider-specific settings. */ cfg: ClawdbotConfig; + /** Optional abort signal for cooperative cancellation. */ + abortSignal?: AbortSignal; }; export type RouteReplyResult = { @@ -52,7 +54,7 @@ export type RouteReplyResult = { export async function routeReply( params: RouteReplyParams, ): Promise { - const { payload, channel, to, accountId, threadId } = params; + const { payload, channel, to, accountId, threadId, abortSignal } = params; // Debug: `pnpm test src/auto-reply/reply/route-reply.test.ts` const text = payload.text ?? ""; @@ -72,6 +74,9 @@ export async function routeReply( text: string; mediaUrl?: string; }): Promise => { + if (abortSignal?.aborted) { + return { ok: false, error: "Reply routing aborted" }; + } const { text, mediaUrl } = params; switch (channel) { case "telegram": { @@ -148,12 +153,18 @@ export async function routeReply( }; try { + if (abortSignal?.aborted) { + return { ok: false, error: "Reply routing aborted" }; + } if (mediaUrls.length === 0) { return await sendOne({ text }); } let last: RouteReplyResult | undefined; for (let i = 0; i < mediaUrls.length; i++) { + if (abortSignal?.aborted) { + return { ok: false, error: "Reply routing aborted" }; + } const mediaUrl = mediaUrls[i]; const caption = i === 0 ? text : ""; last = await sendOne({ text: caption, mediaUrl }); diff --git a/src/auto-reply/status.test.ts b/src/auto-reply/status.test.ts index 2e9f5ba80..36ef4e848 100644 --- a/src/auto-reply/status.test.ts +++ b/src/auto-reply/status.test.ts @@ -102,6 +102,18 @@ describe("buildStatusMessage", () => { expect(text).toContain("🧠 Model: openai/gpt-4.1-mini"); }); + it("keeps provider prefix from configured model", () => { + const text = buildStatusMessage({ + agent: { + model: "google-antigravity/claude-sonnet-4-5", + }, + sessionScope: "per-sender", + queue: { mode: "collect", depth: 0 }, + }); + + expect(text).toContain("🧠 Model: google-antigravity/claude-sonnet-4-5"); + }); + it("handles missing agent config gracefully", () => { const text = buildStatusMessage({ agent: {}, diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 0bd7671d7..7f69aeff9 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -1,14 +1,24 @@ import type { TypingController } from "./reply/typing.js"; +export type BlockReplyContext = { + abortSignal?: AbortSignal; + timeoutMs?: number; +}; + export type GetReplyOptions = { onReplyStart?: () => Promise | void; onTypingController?: (typing: TypingController) => void; isHeartbeat?: boolean; onPartialReply?: (payload: ReplyPayload) => Promise | void; onReasoningStream?: (payload: ReplyPayload) => Promise | void; - onBlockReply?: (payload: ReplyPayload) => Promise | void; + onBlockReply?: ( + payload: ReplyPayload, + context?: BlockReplyContext, + ) => Promise | void; onToolResult?: (payload: ReplyPayload) => Promise | void; disableBlockStreaming?: boolean; + /** Timeout for block reply delivery (ms). */ + blockReplyTimeoutMs?: number; /** If provided, only load these skills for this session (empty = no skills). */ skillFilter?: string[]; }; diff --git a/src/discord/monitor.gateway.test.ts b/src/discord/monitor.gateway.test.ts new file mode 100644 index 000000000..26685da72 --- /dev/null +++ b/src/discord/monitor.gateway.test.ts @@ -0,0 +1,61 @@ +import { EventEmitter } from "node:events"; + +import { describe, expect, it, vi } from "vitest"; + +import { waitForDiscordGatewayStop } from "./monitor.gateway.js"; + +describe("waitForDiscordGatewayStop", () => { + it("resolves on abort and disconnects gateway", async () => { + const emitter = new EventEmitter(); + const disconnect = vi.fn(); + const abort = new AbortController(); + + const promise = waitForDiscordGatewayStop({ + gateway: { emitter, disconnect }, + abortSignal: abort.signal, + }); + + expect(emitter.listenerCount("error")).toBe(1); + abort.abort(); + + await expect(promise).resolves.toBeUndefined(); + expect(disconnect).toHaveBeenCalledTimes(1); + expect(emitter.listenerCount("error")).toBe(0); + }); + + it("rejects on gateway error and disconnects", async () => { + const emitter = new EventEmitter(); + const disconnect = vi.fn(); + const onGatewayError = vi.fn(); + const abort = new AbortController(); + const err = new Error("boom"); + + const promise = waitForDiscordGatewayStop({ + gateway: { emitter, disconnect }, + abortSignal: abort.signal, + onGatewayError, + }); + + emitter.emit("error", err); + + await expect(promise).rejects.toThrow("boom"); + expect(onGatewayError).toHaveBeenCalledWith(err); + expect(disconnect).toHaveBeenCalledTimes(1); + expect(emitter.listenerCount("error")).toBe(0); + + abort.abort(); + expect(disconnect).toHaveBeenCalledTimes(1); + }); + + it("resolves on abort without a gateway", async () => { + const abort = new AbortController(); + + const promise = waitForDiscordGatewayStop({ + abortSignal: abort.signal, + }); + + abort.abort(); + + await expect(promise).resolves.toBeUndefined(); + }); +}); diff --git a/src/discord/monitor.gateway.ts b/src/discord/monitor.gateway.ts new file mode 100644 index 000000000..d09df288b --- /dev/null +++ b/src/discord/monitor.gateway.ts @@ -0,0 +1,63 @@ +import type { EventEmitter } from "node:events"; + +export type DiscordGatewayHandle = { + emitter?: Pick; + disconnect?: () => void; +}; + +export function getDiscordGatewayEmitter( + gateway?: unknown, +): EventEmitter | undefined { + return (gateway as { emitter?: EventEmitter } | undefined)?.emitter; +} + +export async function waitForDiscordGatewayStop(params: { + gateway?: DiscordGatewayHandle; + abortSignal?: AbortSignal; + onGatewayError?: (err: unknown) => void; +}): Promise { + const { gateway, abortSignal, onGatewayError } = params; + const emitter = gateway?.emitter; + return await new Promise((resolve, reject) => { + let settled = false; + const cleanup = () => { + abortSignal?.removeEventListener("abort", onAbort); + emitter?.removeListener("error", onGatewayErrorEvent); + }; + const finishResolve = () => { + if (settled) return; + settled = true; + cleanup(); + try { + gateway?.disconnect?.(); + } finally { + resolve(); + } + }; + const finishReject = (err: unknown) => { + if (settled) return; + settled = true; + cleanup(); + try { + gateway?.disconnect?.(); + } finally { + reject(err); + } + }; + const onAbort = () => { + finishResolve(); + }; + const onGatewayErrorEvent = (err: unknown) => { + onGatewayError?.(err); + finishReject(err); + }; + + if (abortSignal?.aborted) { + onAbort(); + return; + } + + abortSignal?.addEventListener("abort", onAbort, { once: true }); + emitter?.on("error", onGatewayErrorEvent); + }); +} diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 27baa3c66..754d5f302 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -61,6 +61,10 @@ import type { RuntimeEnv } from "../runtime.js"; import { loadWebMedia } from "../web/media.js"; import { resolveDiscordAccount } from "./accounts.js"; import { chunkDiscordText } from "./chunk.js"; +import { + getDiscordGatewayEmitter, + waitForDiscordGatewayStop, +} from "./monitor.gateway.js"; import { fetchDiscordApplicationId } from "./probe.js"; import { reactMessageDiscord, sendMessageDiscord } from "./send.js"; import { normalizeDiscordToken } from "./token.js"; @@ -402,18 +406,19 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`); - await new Promise((resolve) => { - const onAbort = async () => { - try { - const gateway = client.getPlugin("gateway"); - gateway?.disconnect(); - } finally { - resolve(); - } - }; - opts.abortSignal?.addEventListener("abort", () => { - void onAbort(); - }); + const gateway = client.getPlugin("gateway"); + const gatewayEmitter = getDiscordGatewayEmitter(gateway); + await waitForDiscordGatewayStop({ + gateway: gateway + ? { + emitter: gatewayEmitter, + disconnect: () => gateway.disconnect(), + } + : undefined, + abortSignal: opts.abortSignal, + onGatewayError: (err) => { + runtime.error?.(danger(`discord gateway error: ${String(err)}`)); + }, }); } diff --git a/src/telegram/send.ts b/src/telegram/send.ts index d15fa0616..d0715c22c 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -1,4 +1,5 @@ import type { ReactionType, ReactionTypeEmoji } from "@grammyjs/types"; +import type { ApiClientOptions } from "grammy"; import { Bot, InputFile } from "grammy"; import { loadConfig } from "../config/config.js"; import { formatErrorMessage } from "../infra/errors.js"; @@ -113,10 +114,10 @@ export async function sendMessageTelegram( // Use provided api or create a new Bot instance. The nullish coalescing // operator ensures api is always defined (Bot.api is always non-null). const fetchImpl = resolveTelegramFetch(); - const api = - opts.api ?? - new Bot(token, fetchImpl ? { client: { fetch: fetchImpl } } : undefined) - .api; + const client: ApiClientOptions | undefined = fetchImpl + ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } + : undefined; + const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; const mediaUrl = opts.mediaUrl?.trim(); // Build optional params for forum topics and reply threading. @@ -271,10 +272,10 @@ export async function reactMessageTelegram( const chatId = normalizeChatId(String(chatIdInput)); const messageId = normalizeMessageId(messageIdInput); const fetchImpl = resolveTelegramFetch(); - const api = - opts.api ?? - new Bot(token, fetchImpl ? { client: { fetch: fetchImpl } } : undefined) - .api; + const client: ApiClientOptions | undefined = fetchImpl + ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } + : undefined; + const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; const request = createTelegramRetryRunner({ retry: opts.retry, configRetry: account.config.retry, diff --git a/src/telegram/webhook-set.ts b/src/telegram/webhook-set.ts index fc81c1106..78abb9adb 100644 --- a/src/telegram/webhook-set.ts +++ b/src/telegram/webhook-set.ts @@ -1,3 +1,4 @@ +import type { ApiClientOptions } from "grammy"; import { Bot } from "grammy"; import { resolveTelegramFetch } from "./fetch.js"; @@ -8,10 +9,10 @@ export async function setTelegramWebhook(opts: { dropPendingUpdates?: boolean; }) { const fetchImpl = resolveTelegramFetch(); - const bot = new Bot( - opts.token, - fetchImpl ? { client: { fetch: fetchImpl } } : undefined, - ); + const client: ApiClientOptions | undefined = fetchImpl + ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } + : undefined; + const bot = new Bot(opts.token, client ? { client } : undefined); await bot.api.setWebhook(opts.url, { secret_token: opts.secret, drop_pending_updates: opts.dropPendingUpdates ?? false, @@ -20,9 +21,9 @@ export async function setTelegramWebhook(opts: { export async function deleteTelegramWebhook(opts: { token: string }) { const fetchImpl = resolveTelegramFetch(); - const bot = new Bot( - opts.token, - fetchImpl ? { client: { fetch: fetchImpl } } : undefined, - ); + const client: ApiClientOptions | undefined = fetchImpl + ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } + : undefined; + const bot = new Bot(opts.token, client ? { client } : undefined); await bot.api.deleteWebhook(); }