Merge branch 'main' into fix/telegram-fetch-type-errors

This commit is contained in:
Yuri Chukhlib
2026-01-08 20:09:00 +01:00
committed by GitHub
10 changed files with 347 additions and 27 deletions

View File

@@ -2,6 +2,8 @@
## Unreleased ## Unreleased
- Discord: stop provider when gateway reconnects are exhausted and surface errors. (#514) — thanks @joshp123
- Auto-reply: preserve block reply ordering with timeout fallback for streaming. (#503) — thanks @joshp123
- WhatsApp: group `/model list` output by provider for scannability. (#456) - thanks @mcinteerj - WhatsApp: group `/model list` output by provider for scannability. (#456) - thanks @mcinteerj
- Hooks: allow per-hook model overrides for webhook/Gmail runs (e.g. GPT 5 Mini). - Hooks: allow per-hook model overrides for webhook/Gmail runs (e.g. GPT 5 Mini).
- Control UI: logs tab opens at the newest entries (bottom). - Control UI: logs tab opens at the newest entries (bottom).

View File

@@ -78,7 +78,7 @@ describe("buildAgentSystemPrompt", () => {
toolNames: ["gateway", "bash"], toolNames: ["gateway", "bash"],
}); });
expect(prompt).toContain("## ClaudeBot Self-Update"); expect(prompt).toContain("## Clawdbot Self-Update");
expect(prompt).toContain("config.apply"); expect(prompt).toContain("config.apply");
expect(prompt).toContain("update.run"); expect(prompt).toContain("update.run");
}); });

View File

@@ -103,6 +103,61 @@ describe("block streaming", () => {
}); });
}); });
it("preserves block reply ordering when typing start is slow", async () => {
await withTempHome(async (home) => {
let releaseTyping: (() => void) | undefined;
const typingGate = new Promise<void>((resolve) => {
releaseTyping = resolve;
});
const onReplyStart = vi.fn(() => typingGate);
const seen: string[] = [];
const onBlockReply = vi.fn(async (payload) => {
seen.push(payload.text ?? "");
});
vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => {
void params.onBlockReply?.({ text: "first" });
void params.onBlockReply?.({ text: "second" });
return {
payloads: [{ text: "first" }, { text: "second" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
};
});
const replyPromise = getReplyFromConfig(
{
Body: "ping",
From: "+1004",
To: "+2000",
MessageSid: "msg-125",
Provider: "telegram",
},
{
onReplyStart,
onBlockReply,
},
{
agent: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
telegram: { allowFrom: ["*"] },
session: { store: path.join(home, "sessions.json") },
},
);
await waitForCalls(() => onReplyStart.mock.calls.length, 1);
releaseTyping?.();
const res = await replyPromise;
expect(res).toBeUndefined();
expect(seen).toEqual(["first", "second"]);
});
});
it("drops final payloads when block replies streamed", async () => { it("drops final payloads when block replies streamed", async () => {
await withTempHome(async (home) => { await withTempHome(async (home) => {
const onBlockReply = vi.fn().mockResolvedValue(undefined); const onBlockReply = vi.fn().mockResolvedValue(undefined);
@@ -143,4 +198,59 @@ describe("block streaming", () => {
expect(onBlockReply).toHaveBeenCalledTimes(1); expect(onBlockReply).toHaveBeenCalledTimes(1);
}); });
}); });
it("falls back to final payloads when block reply send times out", async () => {
await withTempHome(async (home) => {
let sawAbort = false;
const onBlockReply = vi.fn((_, context) => {
return new Promise<void>((resolve) => {
context?.abortSignal?.addEventListener(
"abort",
() => {
sawAbort = true;
resolve();
},
{ once: true },
);
});
});
vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => {
void params.onBlockReply?.({ text: "streamed" });
return {
payloads: [{ text: "final" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
};
});
const replyPromise = getReplyFromConfig(
{
Body: "ping",
From: "+1004",
To: "+2000",
MessageSid: "msg-126",
Provider: "telegram",
},
{
onBlockReply,
blockReplyTimeoutMs: 10,
},
{
agent: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
telegram: { allowFrom: ["*"] },
session: { store: path.join(home, "sessions.json") },
},
);
const res = await replyPromise;
expect(res).toMatchObject({ text: "final" });
expect(sawAbort).toBe(true);
});
});
}); });

View File

@@ -47,6 +47,7 @@ import type { TypingController } from "./typing.js";
import { createTypingSignaler } from "./typing-mode.js"; import { createTypingSignaler } from "./typing-mode.js";
const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i; const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i;
const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000;
const isBunFetchSocketError = (message?: string) => const isBunFetchSocketError = (message?: string) =>
Boolean(message && BUN_FETCH_SOCKET_ERROR_RE.test(message)); Boolean(message && BUN_FETCH_SOCKET_ERROR_RE.test(message));
@@ -61,6 +62,23 @@ const formatBunFetchSocketError = (message: string) => {
].join("\n"); ].join("\n");
}; };
const withTimeout = async <T>(
promise: Promise<T>,
timeoutMs: number,
timeoutError: Error,
): Promise<T> => {
if (!timeoutMs || timeoutMs <= 0) return promise;
let timer: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(timeoutError), timeoutMs);
});
try {
return await Promise.race([promise, timeoutPromise]);
} finally {
if (timer) clearTimeout(timer);
}
};
export async function runReplyAgent(params: { export async function runReplyAgent(params: {
commandBody: string; commandBody: string;
followupRun: FollowupRun; followupRun: FollowupRun;
@@ -144,7 +162,12 @@ export async function runReplyAgent(params: {
const pendingStreamedPayloadKeys = new Set<string>(); const pendingStreamedPayloadKeys = new Set<string>();
const pendingBlockTasks = new Set<Promise<void>>(); const pendingBlockTasks = new Set<Promise<void>>();
const pendingToolTasks = new Set<Promise<void>>(); const pendingToolTasks = new Set<Promise<void>>();
let blockReplyChain: Promise<void> = Promise.resolve();
let blockReplyAborted = false;
let didLogBlockReplyAbort = false;
let didStreamBlockReply = false; let didStreamBlockReply = false;
const blockReplyTimeoutMs =
opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
const buildPayloadKey = (payload: ReplyPayload) => { const buildPayloadKey = (payload: ReplyPayload) => {
const text = payload.text?.trim() ?? ""; const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length const mediaList = payload.mediaUrls?.length
@@ -367,16 +390,49 @@ export async function runReplyAgent(params: {
) { ) {
return; return;
} }
if (blockReplyAborted) return;
pendingStreamedPayloadKeys.add(payloadKey); pendingStreamedPayloadKeys.add(payloadKey);
const task = (async () => { void typingSignals
await typingSignals.signalTextDelta(taggedPayload.text); .signalTextDelta(taggedPayload.text)
await opts.onBlockReply?.(blockPayload); .catch((err) => {
})() logVerbose(
.then(() => { `block reply typing signal failed: ${String(err)}`,
);
});
const timeoutError = new Error(
`block reply delivery timed out after ${blockReplyTimeoutMs}ms`,
);
const abortController = new AbortController();
blockReplyChain = blockReplyChain
.then(async () => {
if (blockReplyAborted) return false;
await withTimeout(
opts.onBlockReply?.(blockPayload, {
abortSignal: abortController.signal,
timeoutMs: blockReplyTimeoutMs,
}) ?? Promise.resolve(),
blockReplyTimeoutMs,
timeoutError,
);
return true;
})
.then((didSend) => {
if (!didSend) return;
streamedPayloadKeys.add(payloadKey); streamedPayloadKeys.add(payloadKey);
didStreamBlockReply = true; didStreamBlockReply = true;
}) })
.catch((err) => { .catch((err) => {
if (err === timeoutError) {
abortController.abort();
blockReplyAborted = true;
if (!didLogBlockReplyAbort) {
didLogBlockReplyAbort = true;
logVerbose(
`block reply delivery timed out after ${blockReplyTimeoutMs}ms; skipping remaining block replies to preserve ordering`,
);
}
return;
}
logVerbose( logVerbose(
`block reply delivery failed: ${String(err)}`, `block reply delivery failed: ${String(err)}`,
); );
@@ -384,6 +440,7 @@ export async function runReplyAgent(params: {
.finally(() => { .finally(() => {
pendingStreamedPayloadKeys.delete(payloadKey); pendingStreamedPayloadKeys.delete(payloadKey);
}); });
const task = blockReplyChain;
pendingBlockTasks.add(task); pendingBlockTasks.add(task);
void task.finally(() => pendingBlockTasks.delete(task)); void task.finally(() => pendingBlockTasks.delete(task));
} }
@@ -546,10 +603,10 @@ export async function runReplyAgent(params: {
}) })
.filter(isRenderablePayload); .filter(isRenderablePayload);
// Drop final payloads if block streaming is enabled and we already streamed // Drop final payloads only when block streaming succeeded end-to-end.
// block replies. Tool-sent duplicates are filtered below. // If streaming aborted (e.g., timeout), fall back to final payloads.
const shouldDropFinalPayloads = const shouldDropFinalPayloads =
blockStreamingEnabled && didStreamBlockReply; blockStreamingEnabled && didStreamBlockReply && !blockReplyAborted;
const messagingToolSentTexts = runResult.messagingToolSentTexts ?? []; const messagingToolSentTexts = runResult.messagingToolSentTexts ?? [];
const messagingToolSentTargets = runResult.messagingToolSentTargets ?? []; const messagingToolSentTargets = runResult.messagingToolSentTargets ?? [];
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({

View File

@@ -41,10 +41,14 @@ export async function dispatchReplyFromConfig(params: {
* Note: Only called when shouldRouteToOriginating is true, so * Note: Only called when shouldRouteToOriginating is true, so
* originatingChannel and originatingTo are guaranteed to be defined. * originatingChannel and originatingTo are guaranteed to be defined.
*/ */
const sendPayloadAsync = async (payload: ReplyPayload): Promise<void> => { const sendPayloadAsync = async (
payload: ReplyPayload,
abortSignal?: AbortSignal,
): Promise<void> => {
// TypeScript doesn't narrow these from the shouldRouteToOriginating check, // TypeScript doesn't narrow these from the shouldRouteToOriginating check,
// but they're guaranteed non-null when this function is called. // but they're guaranteed non-null when this function is called.
if (!originatingChannel || !originatingTo) return; if (!originatingChannel || !originatingTo) return;
if (abortSignal?.aborted) return;
const result = await routeReply({ const result = await routeReply({
payload, payload,
channel: originatingChannel, channel: originatingChannel,
@@ -52,6 +56,7 @@ export async function dispatchReplyFromConfig(params: {
accountId: ctx.AccountId, accountId: ctx.AccountId,
threadId: ctx.MessageThreadId, threadId: ctx.MessageThreadId,
cfg, cfg,
abortSignal,
}); });
if (!result.ok) { if (!result.ok) {
logVerbose( logVerbose(
@@ -73,10 +78,10 @@ export async function dispatchReplyFromConfig(params: {
dispatcher.sendToolResult(payload); dispatcher.sendToolResult(payload);
} }
}, },
onBlockReply: (payload: ReplyPayload) => { onBlockReply: (payload: ReplyPayload, context) => {
if (shouldRouteToOriginating) { if (shouldRouteToOriginating) {
// Fire-and-forget for streaming block replies when routing. // Await routed sends so upstream can enforce ordering/timeouts.
void sendPayloadAsync(payload); return sendPayloadAsync(payload, context?.abortSignal);
} else { } else {
// Synchronous dispatch to preserve callback timing. // Synchronous dispatch to preserve callback timing.
dispatcher.sendBlockReply(payload); dispatcher.sendBlockReply(payload);

View File

@@ -31,6 +31,22 @@ vi.mock("../../web/outbound.js", () => ({
const { routeReply } = await import("./route-reply.js"); const { routeReply } = await import("./route-reply.js");
describe("routeReply", () => { describe("routeReply", () => {
it("skips sends when abort signal is already aborted", async () => {
mocks.sendMessageSlack.mockClear();
const controller = new AbortController();
controller.abort();
const res = await routeReply({
payload: { text: "hi" },
channel: "slack",
to: "channel:C123",
cfg: {} as never,
abortSignal: controller.signal,
});
expect(res.ok).toBe(false);
expect(res.error).toContain("aborted");
expect(mocks.sendMessageSlack).not.toHaveBeenCalled();
});
it("no-ops on empty payload", async () => { it("no-ops on empty payload", async () => {
mocks.sendMessageSlack.mockClear(); mocks.sendMessageSlack.mockClear();
const res = await routeReply({ const res = await routeReply({

View File

@@ -30,6 +30,8 @@ export type RouteReplyParams = {
threadId?: number; threadId?: number;
/** Config for provider-specific settings. */ /** Config for provider-specific settings. */
cfg: ClawdbotConfig; cfg: ClawdbotConfig;
/** Optional abort signal for cooperative cancellation. */
abortSignal?: AbortSignal;
}; };
export type RouteReplyResult = { export type RouteReplyResult = {
@@ -52,7 +54,7 @@ export type RouteReplyResult = {
export async function routeReply( export async function routeReply(
params: RouteReplyParams, params: RouteReplyParams,
): Promise<RouteReplyResult> { ): Promise<RouteReplyResult> {
const { payload, channel, to, accountId, threadId } = params; const { payload, channel, to, accountId, threadId, abortSignal } = params;
// Debug: `pnpm test src/auto-reply/reply/route-reply.test.ts` // Debug: `pnpm test src/auto-reply/reply/route-reply.test.ts`
const text = payload.text ?? ""; const text = payload.text ?? "";
@@ -72,6 +74,9 @@ export async function routeReply(
text: string; text: string;
mediaUrl?: string; mediaUrl?: string;
}): Promise<RouteReplyResult> => { }): Promise<RouteReplyResult> => {
if (abortSignal?.aborted) {
return { ok: false, error: "Reply routing aborted" };
}
const { text, mediaUrl } = params; const { text, mediaUrl } = params;
switch (channel) { switch (channel) {
case "telegram": { case "telegram": {
@@ -148,12 +153,18 @@ export async function routeReply(
}; };
try { try {
if (abortSignal?.aborted) {
return { ok: false, error: "Reply routing aborted" };
}
if (mediaUrls.length === 0) { if (mediaUrls.length === 0) {
return await sendOne({ text }); return await sendOne({ text });
} }
let last: RouteReplyResult | undefined; let last: RouteReplyResult | undefined;
for (let i = 0; i < mediaUrls.length; i++) { for (let i = 0; i < mediaUrls.length; i++) {
if (abortSignal?.aborted) {
return { ok: false, error: "Reply routing aborted" };
}
const mediaUrl = mediaUrls[i]; const mediaUrl = mediaUrls[i];
const caption = i === 0 ? text : ""; const caption = i === 0 ? text : "";
last = await sendOne({ text: caption, mediaUrl }); last = await sendOne({ text: caption, mediaUrl });

View File

@@ -1,14 +1,24 @@
import type { TypingController } from "./reply/typing.js"; import type { TypingController } from "./reply/typing.js";
export type BlockReplyContext = {
abortSignal?: AbortSignal;
timeoutMs?: number;
};
export type GetReplyOptions = { export type GetReplyOptions = {
onReplyStart?: () => Promise<void> | void; onReplyStart?: () => Promise<void> | void;
onTypingController?: (typing: TypingController) => void; onTypingController?: (typing: TypingController) => void;
isHeartbeat?: boolean; isHeartbeat?: boolean;
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void; onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
onReasoningStream?: (payload: ReplyPayload) => Promise<void> | void; onReasoningStream?: (payload: ReplyPayload) => Promise<void> | void;
onBlockReply?: (payload: ReplyPayload) => Promise<void> | void; onBlockReply?: (
payload: ReplyPayload,
context?: BlockReplyContext,
) => Promise<void> | void;
onToolResult?: (payload: ReplyPayload) => Promise<void> | void; onToolResult?: (payload: ReplyPayload) => Promise<void> | void;
disableBlockStreaming?: boolean; disableBlockStreaming?: boolean;
/** Timeout for block reply delivery (ms). */
blockReplyTimeoutMs?: number;
/** If provided, only load these skills for this session (empty = no skills). */ /** If provided, only load these skills for this session (empty = no skills). */
skillFilter?: string[]; skillFilter?: string[];
}; };

View File

@@ -0,0 +1,49 @@
import { EventEmitter } from "node:events";
import { describe, expect, it, vi } from "vitest";
import { waitForDiscordGatewayStop } from "./monitor.js";
describe("waitForDiscordGatewayStop", () => {
it("resolves on abort and disconnects gateway", async () => {
const emitter = new EventEmitter();
const disconnect = vi.fn();
const abort = new AbortController();
const promise = waitForDiscordGatewayStop({
gateway: { emitter, disconnect },
abortSignal: abort.signal,
});
expect(emitter.listenerCount("error")).toBe(1);
abort.abort();
await expect(promise).resolves.toBeUndefined();
expect(disconnect).toHaveBeenCalledTimes(1);
expect(emitter.listenerCount("error")).toBe(0);
});
it("rejects on gateway error and disconnects", async () => {
const emitter = new EventEmitter();
const disconnect = vi.fn();
const onGatewayError = vi.fn();
const abort = new AbortController();
const err = new Error("boom");
const promise = waitForDiscordGatewayStop({
gateway: { emitter, disconnect },
abortSignal: abort.signal,
onGatewayError,
});
emitter.emit("error", err);
await expect(promise).rejects.toThrow("boom");
expect(onGatewayError).toHaveBeenCalledWith(err);
expect(disconnect).toHaveBeenCalledTimes(1);
expect(emitter.listenerCount("error")).toBe(0);
abort.abort();
expect(disconnect).toHaveBeenCalledTimes(1);
});
});

View File

@@ -1,3 +1,5 @@
import type { EventEmitter } from "node:events";
import { import {
ChannelType, ChannelType,
Client, Client,
@@ -82,6 +84,62 @@ type DiscordMediaInfo = {
placeholder: string; placeholder: string;
}; };
type DiscordGatewayHandle = {
emitter?: Pick<EventEmitter, "on" | "removeListener">;
disconnect?: () => void;
};
export async function waitForDiscordGatewayStop(params: {
gateway?: DiscordGatewayHandle;
abortSignal?: AbortSignal;
onGatewayError?: (err: unknown) => void;
}): Promise<void> {
const { gateway, abortSignal, onGatewayError } = params;
const emitter = gateway?.emitter;
return await new Promise<void>((resolve, reject) => {
let settled = false;
const cleanup = () => {
abortSignal?.removeEventListener("abort", onAbort);
emitter?.removeListener("error", onGatewayErrorEvent);
};
const finishResolve = () => {
if (settled) return;
settled = true;
cleanup();
try {
gateway?.disconnect?.();
} finally {
resolve();
}
};
const finishReject = (err: unknown) => {
if (settled) return;
settled = true;
cleanup();
try {
gateway?.disconnect?.();
} finally {
reject(err);
}
};
const onAbort = () => {
finishResolve();
};
const onGatewayErrorEvent = (err: unknown) => {
onGatewayError?.(err);
finishReject(err);
};
if (abortSignal?.aborted) {
onAbort();
return;
}
abortSignal?.addEventListener("abort", onAbort, { once: true });
emitter?.on("error", onGatewayErrorEvent);
});
}
type DiscordHistoryEntry = { type DiscordHistoryEntry = {
sender: string; sender: string;
body: string; body: string;
@@ -402,18 +460,20 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`); runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`);
await new Promise<void>((resolve) => { const gateway = client.getPlugin<GatewayPlugin>("gateway");
const onAbort = async () => { const gatewayEmitter = (gateway as unknown as { emitter?: EventEmitter })
try { ?.emitter;
const gateway = client.getPlugin<GatewayPlugin>("gateway"); await waitForDiscordGatewayStop({
gateway?.disconnect(); gateway: gateway
} finally { ? {
resolve(); emitter: gatewayEmitter,
} disconnect: () => gateway.disconnect(),
}; }
opts.abortSignal?.addEventListener("abort", () => { : undefined,
void onAbort(); abortSignal: opts.abortSignal,
}); onGatewayError: (err) => {
runtime.error?.(danger(`discord gateway error: ${String(err)}`));
},
}); });
} }