Merge branch 'main' into commands-list-clean

This commit is contained in:
Luke
2026-01-08 15:35:17 -05:00
committed by GitHub
45 changed files with 1587 additions and 387 deletions

View File

@@ -78,7 +78,7 @@ describe("buildAgentSystemPrompt", () => {
toolNames: ["gateway", "bash"],
});
expect(prompt).toContain("## ClaudeBot Self-Update");
expect(prompt).toContain("## Clawdbot Self-Update");
expect(prompt).toContain("config.apply");
expect(prompt).toContain("update.run");
});

View File

@@ -44,7 +44,7 @@ export function buildAgentSystemPrompt(params: {
nodes: "List/describe/notify/camera/screen on paired nodes",
cron: "Manage cron jobs and wake events",
gateway:
"Restart, apply config, or run updates on the running ClaudeBot process",
"Restart, apply config, or run updates on the running Clawdbot process",
agents_list: "List agent ids allowed for sessions_spawn",
sessions_list: "List other sessions (incl. sub-agents) with filters/last",
sessions_history: "Fetch history for another session/sub-agent",
@@ -129,7 +129,7 @@ export function buildAgentSystemPrompt(params: {
const runtimeInfo = params.runtimeInfo;
const lines = [
"You are a personal assistant running inside ClaudeBot.",
"You are a personal assistant running inside Clawdbot.",
"",
"## Tooling",
"Tool availability (filtered by policy):",
@@ -157,13 +157,13 @@ export function buildAgentSystemPrompt(params: {
"## Skills",
`Skills provide task-specific instructions. Use \`read\` to load from ${params.workspaceDir}/skills/<name>/SKILL.md when needed.`,
"",
hasGateway ? "## ClaudeBot Self-Update" : "",
hasGateway ? "## Clawdbot Self-Update" : "",
hasGateway
? [
"Get Updates (self-update) is ONLY allowed when the user explicitly asks for it.",
"Do not run config.apply or update.run unless the user explicitly requests an update or config change; if it's not explicit, ask first.",
"Actions: config.get, config.schema, config.apply (validate + write full config, then restart), update.run (update deps or git, then restart).",
"After restart, ClaudeBot pings the last active session automatically.",
"After restart, Clawdbot pings the last active session automatically.",
].join("\n")
: "",
hasGateway ? "" : "",
@@ -212,7 +212,7 @@ export function buildAgentSystemPrompt(params: {
ownerLine ?? "",
ownerLine ? "" : "",
"## Workspace Files (injected)",
"These user-editable files are loaded by ClaudeBot and included below in Project Context.",
"These user-editable files are loaded by Clawdbot and included below in Project Context.",
"",
userTimezone || userTime
? `Time: assume UTC unless stated. User TZ=${userTimezone ?? "unknown"}. Current user time (converted)=${userTime ?? "unknown"}.`
@@ -251,7 +251,7 @@ export function buildAgentSystemPrompt(params: {
heartbeatPromptLine,
"If you receive a heartbeat poll (a user message matching the heartbeat prompt above), and there is nothing that needs attention, reply exactly:",
"HEARTBEAT_OK",
'ClaudeBot treats a leading/trailing "HEARTBEAT_OK" as a heartbeat ack (and may discard it).',
'Clawdbot treats a leading/trailing "HEARTBEAT_OK" as a heartbeat ack (and may discard it).',
'If something needs attention, do NOT include "HEARTBEAT_OK"; reply with the alert text instead.',
"",
"## Runtime",

View File

@@ -16,12 +16,20 @@ describe("cron tool", () => {
it.each([
[
"update",
{ action: "update", id: "job-1", patch: { foo: "bar" } },
{ action: "update", jobId: "job-1", patch: { foo: "bar" } },
{ id: "job-1", patch: { foo: "bar" } },
],
["remove", { action: "remove", id: "job-1" }, { id: "job-1" }],
["run", { action: "run", id: "job-1" }, { id: "job-1" }],
["runs", { action: "runs", id: "job-1" }, { id: "job-1" }],
[
"update",
{ action: "update", id: "job-2", patch: { foo: "bar" } },
{ id: "job-2", patch: { foo: "bar" } },
],
["remove", { action: "remove", jobId: "job-1" }, { id: "job-1" }],
["remove", { action: "remove", id: "job-2" }, { id: "job-2" }],
["run", { action: "run", jobId: "job-1" }, { id: "job-1" }],
["run", { action: "run", id: "job-2" }, { id: "job-2" }],
["runs", { action: "runs", jobId: "job-1" }, { id: "job-1" }],
["runs", { action: "runs", id: "job-2" }, { id: "job-2" }],
])("%s sends id to gateway", async (action, args, expectedParams) => {
const tool = createCronTool();
await tool.execute("call1", args);
@@ -35,6 +43,20 @@ describe("cron tool", () => {
expect(call.params).toEqual(expectedParams);
});
it("prefers jobId over id when both are provided", async () => {
const tool = createCronTool();
await tool.execute("call1", {
action: "run",
jobId: "job-primary",
id: "job-legacy",
});
const call = callGatewayMock.mock.calls[0]?.[0] as {
params?: unknown;
};
expect(call?.params).toEqual({ id: "job-primary" });
});
it("normalizes cron.add job payloads", async () => {
const tool = createCronTool();
await tool.execute("call2", {

View File

@@ -47,7 +47,8 @@ const CronToolSchema = Type.Union([
gatewayUrl: Type.Optional(Type.String()),
gatewayToken: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Number()),
id: Type.String(),
jobId: Type.Optional(Type.String()),
id: Type.Optional(Type.String()),
patch: Type.Object({}, { additionalProperties: true }),
}),
Type.Object({
@@ -55,21 +56,24 @@ const CronToolSchema = Type.Union([
gatewayUrl: Type.Optional(Type.String()),
gatewayToken: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Number()),
id: Type.String(),
jobId: Type.Optional(Type.String()),
id: Type.Optional(Type.String()),
}),
Type.Object({
action: Type.Literal("run"),
gatewayUrl: Type.Optional(Type.String()),
gatewayToken: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Number()),
id: Type.String(),
jobId: Type.Optional(Type.String()),
id: Type.Optional(Type.String()),
}),
Type.Object({
action: Type.Literal("runs"),
gatewayUrl: Type.Optional(Type.String()),
gatewayToken: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Number()),
id: Type.String(),
jobId: Type.Optional(Type.String()),
id: Type.Optional(Type.String()),
}),
Type.Object({
action: Type.Literal("wake"),
@@ -88,7 +92,7 @@ export function createCronTool(): AnyAgentTool {
label: "Cron",
name: "cron",
description:
"Manage Gateway cron jobs (status/list/add/update/remove/run/runs) and send wake events.",
"Manage Gateway cron jobs (status/list/add/update/remove/run/runs) and send wake events. Use `jobId` as the canonical identifier; `id` is accepted for compatibility.",
parameters: CronToolSchema,
execute: async (_toolCallId, args) => {
const params = args as Record<string, unknown>;
@@ -121,7 +125,13 @@ export function createCronTool(): AnyAgentTool {
);
}
case "update": {
const id = readStringParam(params, "id", { required: true });
const id =
readStringParam(params, "jobId") ?? readStringParam(params, "id");
if (!id) {
throw new Error(
"jobId required (id accepted for backward compatibility)",
);
}
if (!params.patch || typeof params.patch !== "object") {
throw new Error("patch required");
}
@@ -134,19 +144,37 @@ export function createCronTool(): AnyAgentTool {
);
}
case "remove": {
const id = readStringParam(params, "id", { required: true });
const id =
readStringParam(params, "jobId") ?? readStringParam(params, "id");
if (!id) {
throw new Error(
"jobId required (id accepted for backward compatibility)",
);
}
return jsonResult(
await callGatewayTool("cron.remove", gatewayOpts, { id }),
);
}
case "run": {
const id = readStringParam(params, "id", { required: true });
const id =
readStringParam(params, "jobId") ?? readStringParam(params, "id");
if (!id) {
throw new Error(
"jobId required (id accepted for backward compatibility)",
);
}
return jsonResult(
await callGatewayTool("cron.run", gatewayOpts, { id }),
);
}
case "runs": {
const id = readStringParam(params, "id", { required: true });
const id =
readStringParam(params, "jobId") ?? readStringParam(params, "id");
if (!id) {
throw new Error(
"jobId required (id accepted for backward compatibility)",
);
}
return jsonResult(
await callGatewayTool("cron.runs", gatewayOpts, { id }),
);

View File

@@ -67,6 +67,12 @@ describe("chunkText", () => {
const chunks = chunkText(text, 10);
expect(chunks).toEqual(["Supercalif", "ragilistic", "expialidoc", "ious"]);
});
it("keeps parenthetical phrases together", () => {
const text = "Heads up now (Though now I'm curious)ok";
const chunks = chunkText(text, 35);
expect(chunks).toEqual(["Heads up now", "(Though now I'm curious)ok"]);
});
});
describe("resolveTextChunkLimit", () => {
@@ -184,4 +190,29 @@ describe("chunkMarkdownText", () => {
expect(nonFenceLines.join("\n").trim()).not.toBe("");
}
});
it("keeps parenthetical phrases together", () => {
const text = "Heads up now (Though now I'm curious)ok";
const chunks = chunkMarkdownText(text, 35);
expect(chunks).toEqual(["Heads up now", "(Though now I'm curious)ok"]);
});
it("handles nested parentheses", () => {
const text = "Hello (outer (inner) end) world";
const chunks = chunkMarkdownText(text, 26);
expect(chunks).toEqual(["Hello (outer (inner) end)", "world"]);
});
it("hard-breaks when a parenthetical exceeds the limit", () => {
const text = `(${"a".repeat(80)})`;
const chunks = chunkMarkdownText(text, 20);
expect(chunks[0]?.length).toBe(20);
expect(chunks.join("")).toBe(text);
});
it("ignores unmatched closing parentheses", () => {
const text = "Hello) world (ok)";
const chunks = chunkMarkdownText(text, 12);
expect(chunks).toEqual(["Hello)", "world (ok)"]);
});
});

View File

@@ -90,18 +90,27 @@ export function chunkText(text: string, limit: number): string[] {
while (remaining.length > limit) {
const window = remaining.slice(0, limit);
// 1) Prefer a newline break inside the window.
let breakIdx = window.lastIndexOf("\n");
// 1) Prefer a newline break inside the window (outside parentheses).
let lastNewline = -1;
let lastWhitespace = -1;
let depth = 0;
for (let i = 0; i < window.length; i++) {
const char = window[i];
if (char === "(") {
depth += 1;
continue;
}
if (char === ")" && depth > 0) {
depth -= 1;
continue;
}
if (depth !== 0) continue;
if (char === "\n") lastNewline = i;
else if (/\s/.test(char)) lastWhitespace = i;
}
// 2) Otherwise prefer the last whitespace (word boundary) inside the window.
if (breakIdx <= 0) {
for (let i = window.length - 1; i >= 0; i--) {
if (/\s/.test(window[i])) {
breakIdx = i;
break;
}
}
}
let breakIdx = lastNewline > 0 ? lastNewline : lastWhitespace;
// 3) Fallback: hard break exactly at the limit.
if (breakIdx <= 0) breakIdx = limit;
@@ -234,15 +243,27 @@ function pickSafeBreakIndex(
window: string,
spans: ReturnType<typeof parseFenceSpans>,
): number {
let newlineIdx = window.lastIndexOf("\n");
while (newlineIdx > 0) {
if (isSafeFenceBreak(spans, newlineIdx)) return newlineIdx;
newlineIdx = window.lastIndexOf("\n", newlineIdx - 1);
}
for (let i = window.length - 1; i > 0; i--) {
if (/\s/.test(window[i]) && isSafeFenceBreak(spans, i)) return i;
let lastNewline = -1;
let lastWhitespace = -1;
let depth = 0;
for (let i = 0; i < window.length; i++) {
if (!isSafeFenceBreak(spans, i)) continue;
const char = window[i];
if (char === "(") {
depth += 1;
continue;
}
if (char === ")" && depth > 0) {
depth -= 1;
continue;
}
if (depth !== 0) continue;
if (char === "\n") lastNewline = i;
else if (/\s/.test(char)) lastWhitespace = i;
}
if (lastNewline > 0) return lastNewline;
if (lastWhitespace > 0) return lastWhitespace;
return -1;
}

View File

@@ -103,6 +103,61 @@ describe("block streaming", () => {
});
});
it("preserves block reply ordering when typing start is slow", async () => {
await withTempHome(async (home) => {
let releaseTyping: (() => void) | undefined;
const typingGate = new Promise<void>((resolve) => {
releaseTyping = resolve;
});
const onReplyStart = vi.fn(() => typingGate);
const seen: string[] = [];
const onBlockReply = vi.fn(async (payload) => {
seen.push(payload.text ?? "");
});
vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => {
void params.onBlockReply?.({ text: "first" });
void params.onBlockReply?.({ text: "second" });
return {
payloads: [{ text: "first" }, { text: "second" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
};
});
const replyPromise = getReplyFromConfig(
{
Body: "ping",
From: "+1004",
To: "+2000",
MessageSid: "msg-125",
Provider: "telegram",
},
{
onReplyStart,
onBlockReply,
},
{
agent: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
telegram: { allowFrom: ["*"] },
session: { store: path.join(home, "sessions.json") },
},
);
await waitForCalls(() => onReplyStart.mock.calls.length, 1);
releaseTyping?.();
const res = await replyPromise;
expect(res).toBeUndefined();
expect(seen).toEqual(["first", "second"]);
});
});
it("drops final payloads when block replies streamed", async () => {
await withTempHome(async (home) => {
const onBlockReply = vi.fn().mockResolvedValue(undefined);
@@ -143,4 +198,59 @@ describe("block streaming", () => {
expect(onBlockReply).toHaveBeenCalledTimes(1);
});
});
it("falls back to final payloads when block reply send times out", async () => {
await withTempHome(async (home) => {
let sawAbort = false;
const onBlockReply = vi.fn((_, context) => {
return new Promise<void>((resolve) => {
context?.abortSignal?.addEventListener(
"abort",
() => {
sawAbort = true;
resolve();
},
{ once: true },
);
});
});
vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => {
void params.onBlockReply?.({ text: "streamed" });
return {
payloads: [{ text: "final" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
};
});
const replyPromise = getReplyFromConfig(
{
Body: "ping",
From: "+1004",
To: "+2000",
MessageSid: "msg-126",
Provider: "telegram",
},
{
onBlockReply,
blockReplyTimeoutMs: 10,
},
{
agent: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
telegram: { allowFrom: ["*"] },
session: { store: path.join(home, "sessions.json") },
},
);
const res = await replyPromise;
expect(res).toMatchObject({ text: "final" });
expect(sawAbort).toBe(true);
});
});
});

View File

@@ -47,6 +47,7 @@ import type { TypingController } from "./typing.js";
import { createTypingSignaler } from "./typing-mode.js";
const BUN_FETCH_SOCKET_ERROR_RE = /socket connection was closed unexpectedly/i;
const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000;
const isBunFetchSocketError = (message?: string) =>
Boolean(message && BUN_FETCH_SOCKET_ERROR_RE.test(message));
@@ -61,6 +62,23 @@ const formatBunFetchSocketError = (message: string) => {
].join("\n");
};
const withTimeout = async <T>(
promise: Promise<T>,
timeoutMs: number,
timeoutError: Error,
): Promise<T> => {
if (!timeoutMs || timeoutMs <= 0) return promise;
let timer: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(timeoutError), timeoutMs);
});
try {
return await Promise.race([promise, timeoutPromise]);
} finally {
if (timer) clearTimeout(timer);
}
};
export async function runReplyAgent(params: {
commandBody: string;
followupRun: FollowupRun;
@@ -144,7 +162,12 @@ export async function runReplyAgent(params: {
const pendingStreamedPayloadKeys = new Set<string>();
const pendingBlockTasks = new Set<Promise<void>>();
const pendingToolTasks = new Set<Promise<void>>();
let blockReplyChain: Promise<void> = Promise.resolve();
let blockReplyAborted = false;
let didLogBlockReplyAbort = false;
let didStreamBlockReply = false;
const blockReplyTimeoutMs =
opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
const buildPayloadKey = (payload: ReplyPayload) => {
const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length
@@ -367,16 +390,49 @@ export async function runReplyAgent(params: {
) {
return;
}
if (blockReplyAborted) return;
pendingStreamedPayloadKeys.add(payloadKey);
const task = (async () => {
await typingSignals.signalTextDelta(taggedPayload.text);
await opts.onBlockReply?.(blockPayload);
})()
.then(() => {
void typingSignals
.signalTextDelta(taggedPayload.text)
.catch((err) => {
logVerbose(
`block reply typing signal failed: ${String(err)}`,
);
});
const timeoutError = new Error(
`block reply delivery timed out after ${blockReplyTimeoutMs}ms`,
);
const abortController = new AbortController();
blockReplyChain = blockReplyChain
.then(async () => {
if (blockReplyAborted) return false;
await withTimeout(
opts.onBlockReply?.(blockPayload, {
abortSignal: abortController.signal,
timeoutMs: blockReplyTimeoutMs,
}) ?? Promise.resolve(),
blockReplyTimeoutMs,
timeoutError,
);
return true;
})
.then((didSend) => {
if (!didSend) return;
streamedPayloadKeys.add(payloadKey);
didStreamBlockReply = true;
})
.catch((err) => {
if (err === timeoutError) {
abortController.abort();
blockReplyAborted = true;
if (!didLogBlockReplyAbort) {
didLogBlockReplyAbort = true;
logVerbose(
`block reply delivery timed out after ${blockReplyTimeoutMs}ms; skipping remaining block replies to preserve ordering`,
);
}
return;
}
logVerbose(
`block reply delivery failed: ${String(err)}`,
);
@@ -384,6 +440,7 @@ export async function runReplyAgent(params: {
.finally(() => {
pendingStreamedPayloadKeys.delete(payloadKey);
});
const task = blockReplyChain;
pendingBlockTasks.add(task);
void task.finally(() => pendingBlockTasks.delete(task));
}
@@ -546,10 +603,10 @@ export async function runReplyAgent(params: {
})
.filter(isRenderablePayload);
// Drop final payloads if block streaming is enabled and we already streamed
// block replies. Tool-sent duplicates are filtered below.
// Drop final payloads only when block streaming succeeded end-to-end.
// If streaming aborted (e.g., timeout), fall back to final payloads.
const shouldDropFinalPayloads =
blockStreamingEnabled && didStreamBlockReply;
blockStreamingEnabled && didStreamBlockReply && !blockReplyAborted;
const messagingToolSentTexts = runResult.messagingToolSentTexts ?? [];
const messagingToolSentTargets = runResult.messagingToolSentTargets ?? [];
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({

View File

@@ -456,7 +456,7 @@ export async function handleCommands(params: {
...cfg.agent,
model: {
...cfg.agent?.model,
primary: model,
primary: `${provider}/${model}`,
},
contextTokens,
thinkingDefault: cfg.agent?.thinkingDefault,

View File

@@ -41,10 +41,14 @@ export async function dispatchReplyFromConfig(params: {
* Note: Only called when shouldRouteToOriginating is true, so
* originatingChannel and originatingTo are guaranteed to be defined.
*/
const sendPayloadAsync = async (payload: ReplyPayload): Promise<void> => {
const sendPayloadAsync = async (
payload: ReplyPayload,
abortSignal?: AbortSignal,
): Promise<void> => {
// TypeScript doesn't narrow these from the shouldRouteToOriginating check,
// but they're guaranteed non-null when this function is called.
if (!originatingChannel || !originatingTo) return;
if (abortSignal?.aborted) return;
const result = await routeReply({
payload,
channel: originatingChannel,
@@ -52,6 +56,7 @@ export async function dispatchReplyFromConfig(params: {
accountId: ctx.AccountId,
threadId: ctx.MessageThreadId,
cfg,
abortSignal,
});
if (!result.ok) {
logVerbose(
@@ -73,10 +78,10 @@ export async function dispatchReplyFromConfig(params: {
dispatcher.sendToolResult(payload);
}
},
onBlockReply: (payload: ReplyPayload) => {
onBlockReply: (payload: ReplyPayload, context) => {
if (shouldRouteToOriginating) {
// Fire-and-forget for streaming block replies when routing.
void sendPayloadAsync(payload);
// Await routed sends so upstream can enforce ordering/timeouts.
return sendPayloadAsync(payload, context?.abortSignal);
} else {
// Synchronous dispatch to preserve callback timing.
dispatcher.sendBlockReply(payload);

View File

@@ -31,6 +31,22 @@ vi.mock("../../web/outbound.js", () => ({
const { routeReply } = await import("./route-reply.js");
describe("routeReply", () => {
it("skips sends when abort signal is already aborted", async () => {
mocks.sendMessageSlack.mockClear();
const controller = new AbortController();
controller.abort();
const res = await routeReply({
payload: { text: "hi" },
channel: "slack",
to: "channel:C123",
cfg: {} as never,
abortSignal: controller.signal,
});
expect(res.ok).toBe(false);
expect(res.error).toContain("aborted");
expect(mocks.sendMessageSlack).not.toHaveBeenCalled();
});
it("no-ops on empty payload", async () => {
mocks.sendMessageSlack.mockClear();
const res = await routeReply({

View File

@@ -30,6 +30,8 @@ export type RouteReplyParams = {
threadId?: number;
/** Config for provider-specific settings. */
cfg: ClawdbotConfig;
/** Optional abort signal for cooperative cancellation. */
abortSignal?: AbortSignal;
};
export type RouteReplyResult = {
@@ -52,7 +54,7 @@ export type RouteReplyResult = {
export async function routeReply(
params: RouteReplyParams,
): Promise<RouteReplyResult> {
const { payload, channel, to, accountId, threadId } = params;
const { payload, channel, to, accountId, threadId, abortSignal } = params;
// Debug: `pnpm test src/auto-reply/reply/route-reply.test.ts`
const text = payload.text ?? "";
@@ -72,6 +74,9 @@ export async function routeReply(
text: string;
mediaUrl?: string;
}): Promise<RouteReplyResult> => {
if (abortSignal?.aborted) {
return { ok: false, error: "Reply routing aborted" };
}
const { text, mediaUrl } = params;
switch (channel) {
case "telegram": {
@@ -148,12 +153,18 @@ export async function routeReply(
};
try {
if (abortSignal?.aborted) {
return { ok: false, error: "Reply routing aborted" };
}
if (mediaUrls.length === 0) {
return await sendOne({ text });
}
let last: RouteReplyResult | undefined;
for (let i = 0; i < mediaUrls.length; i++) {
if (abortSignal?.aborted) {
return { ok: false, error: "Reply routing aborted" };
}
const mediaUrl = mediaUrls[i];
const caption = i === 0 ? text : "";
last = await sendOne({ text: caption, mediaUrl });

View File

@@ -102,6 +102,18 @@ describe("buildStatusMessage", () => {
expect(text).toContain("🧠 Model: openai/gpt-4.1-mini");
});
it("keeps provider prefix from configured model", () => {
const text = buildStatusMessage({
agent: {
model: "google-antigravity/claude-sonnet-4-5",
},
sessionScope: "per-sender",
queue: { mode: "collect", depth: 0 },
});
expect(text).toContain("🧠 Model: google-antigravity/claude-sonnet-4-5");
});
it("handles missing agent config gracefully", () => {
const text = buildStatusMessage({
agent: {},

View File

@@ -1,14 +1,24 @@
import type { TypingController } from "./reply/typing.js";
export type BlockReplyContext = {
abortSignal?: AbortSignal;
timeoutMs?: number;
};
export type GetReplyOptions = {
onReplyStart?: () => Promise<void> | void;
onTypingController?: (typing: TypingController) => void;
isHeartbeat?: boolean;
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
onReasoningStream?: (payload: ReplyPayload) => Promise<void> | void;
onBlockReply?: (payload: ReplyPayload) => Promise<void> | void;
onBlockReply?: (
payload: ReplyPayload,
context?: BlockReplyContext,
) => Promise<void> | void;
onToolResult?: (payload: ReplyPayload) => Promise<void> | void;
disableBlockStreaming?: boolean;
/** Timeout for block reply delivery (ms). */
blockReplyTimeoutMs?: number;
/** If provided, only load these skills for this session (empty = no skills). */
skillFilter?: string[];
};

View File

@@ -33,6 +33,8 @@ import { resolveGatewayLogPaths } from "../daemon/launchd.js";
import { findLegacyGatewayServices } from "../daemon/legacy.js";
import { resolveGatewayProgramArguments } from "../daemon/program-args.js";
import { resolveGatewayService } from "../daemon/service.js";
import type { ServiceConfigAudit } from "../daemon/service-audit.js";
import { auditGatewayServiceConfig } from "../daemon/service-audit.js";
import { callGateway } from "../gateway/call.js";
import { resolveGatewayBindHost } from "../gateway/net.js";
import {
@@ -89,6 +91,7 @@ type DaemonStatus = {
cachedLabel?: boolean;
missingUnit?: boolean;
};
configAudit?: ServiceConfigAudit;
};
config?: {
cli: ConfigSummary;
@@ -343,6 +346,10 @@ async function gatherDaemonStatus(opts: {
service.readCommand(process.env).catch(() => null),
service.readRuntime(process.env).catch(() => undefined),
]);
const configAudit = await auditGatewayServiceConfig({
env: process.env,
command,
});
const serviceEnv = command?.environment ?? undefined;
const mergedDaemonEnv = {
@@ -484,6 +491,7 @@ async function gatherDaemonStatus(opts: {
notLoadedText: service.notLoadedText,
command,
runtime,
configAudit,
},
config: {
cli: cliConfigSummary,
@@ -538,6 +546,16 @@ function printDaemonStatus(status: DaemonStatus, opts: { json: boolean }) {
if (daemonEnvLines.length > 0) {
defaultRuntime.log(`Daemon env: ${daemonEnvLines.join(" ")}`);
}
if (service.configAudit?.issues.length) {
defaultRuntime.error(
"Service config looks out of date or non-standard.",
);
for (const issue of service.configAudit.issues) {
const detail = issue.detail ? ` (${issue.detail})` : "";
defaultRuntime.error(`Service config issue: ${issue.message}${detail}`);
}
defaultRuntime.error('Recommendation: run "clawdbot doctor".');
}
if (status.config) {
const cliCfg = `${status.config.cli.path}${status.config.cli.exists ? "" : " (missing)"}${status.config.cli.valid ? "" : " (invalid)"}`;
defaultRuntime.log(`Config (cli): ${cliCfg}`);

View File

@@ -37,6 +37,25 @@ type GatewayRpcOpts = {
expectFinal?: boolean;
};
type GatewayRunOpts = {
port?: unknown;
bind?: unknown;
token?: unknown;
auth?: unknown;
password?: unknown;
tailscale?: unknown;
tailscaleResetOnExit?: boolean;
allowUnconfigured?: boolean;
force?: boolean;
verbose?: boolean;
wsLog?: unknown;
compact?: boolean;
};
type GatewayRunParams = {
legacyTokenEnv?: boolean;
};
const gatewayLog = createSubsystemLogger("gateway");
type GatewayRunSignalAction = "stop" | "restart";
@@ -246,10 +265,259 @@ const callGatewayCli = async (
}),
);
export function registerGatewayCli(program: Command) {
const gateway = program
.command("gateway")
.description("Run the WebSocket Gateway")
async function runGatewayCommand(
opts: GatewayRunOpts,
params: GatewayRunParams = {},
) {
if (params.legacyTokenEnv) {
const legacyToken = process.env.CLAWDIS_GATEWAY_TOKEN;
if (legacyToken && !process.env.CLAWDBOT_GATEWAY_TOKEN) {
process.env.CLAWDBOT_GATEWAY_TOKEN = legacyToken;
}
}
setVerbose(Boolean(opts.verbose));
const wsLogRaw = (opts.compact ? "compact" : opts.wsLog) as
| string
| undefined;
const wsLogStyle: GatewayWsLogStyle =
wsLogRaw === "compact" ? "compact" : wsLogRaw === "full" ? "full" : "auto";
if (
wsLogRaw !== undefined &&
wsLogRaw !== "auto" &&
wsLogRaw !== "compact" &&
wsLogRaw !== "full"
) {
defaultRuntime.error('Invalid --ws-log (use "auto", "full", "compact")');
defaultRuntime.exit(1);
}
setGatewayWsLogStyle(wsLogStyle);
const cfg = loadConfig();
const portOverride = parsePort(opts.port);
if (opts.port !== undefined && portOverride === null) {
defaultRuntime.error("Invalid port");
defaultRuntime.exit(1);
}
const port = portOverride ?? resolveGatewayPort(cfg);
if (!Number.isFinite(port) || port <= 0) {
defaultRuntime.error("Invalid port");
defaultRuntime.exit(1);
}
if (opts.force) {
try {
const { killed, waitedMs, escalatedToSigkill } =
await forceFreePortAndWait(port, {
timeoutMs: 2000,
intervalMs: 100,
sigtermTimeoutMs: 700,
});
if (killed.length === 0) {
gatewayLog.info(`force: no listeners on port ${port}`);
} else {
for (const proc of killed) {
gatewayLog.info(
`force: killed pid ${proc.pid}${proc.command ? ` (${proc.command})` : ""} on port ${port}`,
);
}
if (escalatedToSigkill) {
gatewayLog.info(
`force: escalated to SIGKILL while freeing port ${port}`,
);
}
if (waitedMs > 0) {
gatewayLog.info(
`force: waited ${waitedMs}ms for port ${port} to free`,
);
}
}
} catch (err) {
defaultRuntime.error(`Force: ${String(err)}`);
defaultRuntime.exit(1);
return;
}
}
if (opts.token) {
process.env.CLAWDBOT_GATEWAY_TOKEN = String(opts.token);
}
const authModeRaw = opts.auth ? String(opts.auth) : undefined;
const authMode: GatewayAuthMode | null =
authModeRaw === "token" || authModeRaw === "password" ? authModeRaw : null;
if (authModeRaw && !authMode) {
defaultRuntime.error('Invalid --auth (use "token" or "password")');
defaultRuntime.exit(1);
return;
}
const tailscaleRaw = opts.tailscale ? String(opts.tailscale) : undefined;
const tailscaleMode =
tailscaleRaw === "off" ||
tailscaleRaw === "serve" ||
tailscaleRaw === "funnel"
? tailscaleRaw
: null;
if (tailscaleRaw && !tailscaleMode) {
defaultRuntime.error(
'Invalid --tailscale (use "off", "serve", or "funnel")',
);
defaultRuntime.exit(1);
return;
}
const configExists = fs.existsSync(CONFIG_PATH_CLAWDBOT);
const mode = cfg.gateway?.mode;
if (!opts.allowUnconfigured && mode !== "local") {
if (!configExists) {
defaultRuntime.error(
"Missing config. Run `clawdbot setup` or set gateway.mode=local (or pass --allow-unconfigured).",
);
} else {
defaultRuntime.error(
`Gateway start blocked: set gateway.mode=local (current: ${mode ?? "unset"}) or pass --allow-unconfigured.`,
);
}
defaultRuntime.exit(1);
return;
}
const bindRaw = String(opts.bind ?? cfg.gateway?.bind ?? "loopback");
const bind =
bindRaw === "loopback" ||
bindRaw === "tailnet" ||
bindRaw === "lan" ||
bindRaw === "auto"
? bindRaw
: null;
if (!bind) {
defaultRuntime.error(
'Invalid --bind (use "loopback", "tailnet", "lan", or "auto")',
);
defaultRuntime.exit(1);
return;
}
const snapshot = await readConfigFileSnapshot().catch(() => null);
const miskeys = extractGatewayMiskeys(snapshot?.parsed);
const authConfig = {
...cfg.gateway?.auth,
...(authMode ? { mode: authMode } : {}),
...(opts.password ? { password: String(opts.password) } : {}),
...(opts.token ? { token: String(opts.token) } : {}),
};
const resolvedAuth = resolveGatewayAuth({
authConfig,
env: process.env,
tailscaleMode: tailscaleMode ?? cfg.gateway?.tailscale?.mode ?? "off",
});
const resolvedAuthMode = resolvedAuth.mode;
const tokenValue = resolvedAuth.token;
const passwordValue = resolvedAuth.password;
const authHints: string[] = [];
if (miskeys.hasGatewayToken) {
authHints.push(
'Found "gateway.token" in config. Use "gateway.auth.token" instead.',
);
}
if (miskeys.hasRemoteToken) {
authHints.push(
'"gateway.remote.token" is for remote CLI calls; it does not enable local gateway auth.',
);
}
if (resolvedAuthMode === "token" && !tokenValue) {
defaultRuntime.error(
[
"Gateway auth is set to token, but no token is configured.",
"Set gateway.auth.token (or CLAWDBOT_GATEWAY_TOKEN), or pass --token.",
...authHints,
]
.filter(Boolean)
.join("\n"),
);
defaultRuntime.exit(1);
return;
}
if (resolvedAuthMode === "password" && !passwordValue) {
defaultRuntime.error(
[
"Gateway auth is set to password, but no password is configured.",
"Set gateway.auth.password (or CLAWDBOT_GATEWAY_PASSWORD), or pass --password.",
...authHints,
]
.filter(Boolean)
.join("\n"),
);
defaultRuntime.exit(1);
return;
}
if (bind !== "loopback" && resolvedAuthMode === "none") {
defaultRuntime.error(
[
`Refusing to bind gateway to ${bind} without auth.`,
"Set gateway.auth.token (or CLAWDBOT_GATEWAY_TOKEN) or pass --token.",
...authHints,
]
.filter(Boolean)
.join("\n"),
);
defaultRuntime.exit(1);
return;
}
try {
await runGatewayLoop({
runtime: defaultRuntime,
start: async () =>
await startGatewayServer(port, {
bind,
auth:
authMode || opts.password || opts.token || authModeRaw
? {
mode: authMode ?? undefined,
token: opts.token ? String(opts.token) : undefined,
password: opts.password ? String(opts.password) : undefined,
}
: undefined,
tailscale:
tailscaleMode || opts.tailscaleResetOnExit
? {
mode: tailscaleMode ?? undefined,
resetOnExit: Boolean(opts.tailscaleResetOnExit),
}
: undefined,
}),
});
} catch (err) {
if (
err instanceof GatewayLockError ||
(err &&
typeof err === "object" &&
(err as { name?: string }).name === "GatewayLockError")
) {
const errMessage = describeUnknownError(err);
defaultRuntime.error(
`Gateway failed to start: ${errMessage}\nIf the gateway is supervised, stop it with: clawdbot daemon stop`,
);
try {
const diagnostics = await inspectPortUsage(port);
if (diagnostics.status === "busy") {
for (const line of formatPortDiagnostics(diagnostics)) {
defaultRuntime.error(line);
}
}
} catch {
// ignore diagnostics failures
}
await maybeExplainGatewayServiceStop();
defaultRuntime.exit(1);
return;
}
defaultRuntime.error(`Gateway failed to start: ${String(err)}`);
defaultRuntime.exit(1);
}
}
function addGatewayRunCommand(
cmd: Command,
params: GatewayRunParams = {},
): Command {
return cmd
.option("--port <port>", "Port for the gateway WebSocket")
.option(
"--bind <mode>",
@@ -288,252 +556,22 @@ export function registerGatewayCli(program: Command) {
)
.option("--compact", 'Alias for "--ws-log compact"', false)
.action(async (opts) => {
setVerbose(Boolean(opts.verbose));
const wsLogRaw = (opts.compact ? "compact" : opts.wsLog) as
| string
| undefined;
const wsLogStyle: GatewayWsLogStyle =
wsLogRaw === "compact"
? "compact"
: wsLogRaw === "full"
? "full"
: "auto";
if (
wsLogRaw !== undefined &&
wsLogRaw !== "auto" &&
wsLogRaw !== "compact" &&
wsLogRaw !== "full"
) {
defaultRuntime.error(
'Invalid --ws-log (use "auto", "full", "compact")',
);
defaultRuntime.exit(1);
}
setGatewayWsLogStyle(wsLogStyle);
const cfg = loadConfig();
const portOverride = parsePort(opts.port);
if (opts.port !== undefined && portOverride === null) {
defaultRuntime.error("Invalid port");
defaultRuntime.exit(1);
}
const port = portOverride ?? resolveGatewayPort(cfg);
if (!Number.isFinite(port) || port <= 0) {
defaultRuntime.error("Invalid port");
defaultRuntime.exit(1);
}
if (opts.force) {
try {
const { killed, waitedMs, escalatedToSigkill } =
await forceFreePortAndWait(port, {
timeoutMs: 2000,
intervalMs: 100,
sigtermTimeoutMs: 700,
});
if (killed.length === 0) {
gatewayLog.info(`force: no listeners on port ${port}`);
} else {
for (const proc of killed) {
gatewayLog.info(
`force: killed pid ${proc.pid}${proc.command ? ` (${proc.command})` : ""} on port ${port}`,
);
}
if (escalatedToSigkill) {
gatewayLog.info(
`force: escalated to SIGKILL while freeing port ${port}`,
);
}
if (waitedMs > 0) {
gatewayLog.info(
`force: waited ${waitedMs}ms for port ${port} to free`,
);
}
}
} catch (err) {
defaultRuntime.error(`Force: ${String(err)}`);
defaultRuntime.exit(1);
return;
}
}
if (opts.token) {
process.env.CLAWDBOT_GATEWAY_TOKEN = String(opts.token);
}
const authModeRaw = opts.auth ? String(opts.auth) : undefined;
const authMode: GatewayAuthMode | null =
authModeRaw === "token" || authModeRaw === "password"
? authModeRaw
: null;
if (authModeRaw && !authMode) {
defaultRuntime.error('Invalid --auth (use "token" or "password")');
defaultRuntime.exit(1);
return;
}
const tailscaleRaw = opts.tailscale ? String(opts.tailscale) : undefined;
const tailscaleMode =
tailscaleRaw === "off" ||
tailscaleRaw === "serve" ||
tailscaleRaw === "funnel"
? tailscaleRaw
: null;
if (tailscaleRaw && !tailscaleMode) {
defaultRuntime.error(
'Invalid --tailscale (use "off", "serve", or "funnel")',
);
defaultRuntime.exit(1);
return;
}
const configExists = fs.existsSync(CONFIG_PATH_CLAWDBOT);
const mode = cfg.gateway?.mode;
if (!opts.allowUnconfigured && mode !== "local") {
if (!configExists) {
defaultRuntime.error(
"Missing config. Run `clawdbot setup` or set gateway.mode=local (or pass --allow-unconfigured).",
);
} else {
defaultRuntime.error(
`Gateway start blocked: set gateway.mode=local (current: ${mode ?? "unset"}) or pass --allow-unconfigured.`,
);
}
defaultRuntime.exit(1);
return;
}
const bindRaw = String(opts.bind ?? cfg.gateway?.bind ?? "loopback");
const bind =
bindRaw === "loopback" ||
bindRaw === "tailnet" ||
bindRaw === "lan" ||
bindRaw === "auto"
? bindRaw
: null;
if (!bind) {
defaultRuntime.error(
'Invalid --bind (use "loopback", "tailnet", "lan", or "auto")',
);
defaultRuntime.exit(1);
return;
}
const snapshot = await readConfigFileSnapshot().catch(() => null);
const miskeys = extractGatewayMiskeys(snapshot?.parsed);
const authConfig = {
...cfg.gateway?.auth,
...(authMode ? { mode: authMode } : {}),
...(opts.password ? { password: String(opts.password) } : {}),
...(opts.token ? { token: String(opts.token) } : {}),
};
const resolvedAuth = resolveGatewayAuth({
authConfig,
env: process.env,
tailscaleMode: tailscaleMode ?? cfg.gateway?.tailscale?.mode ?? "off",
});
const resolvedAuthMode = resolvedAuth.mode;
const tokenValue = resolvedAuth.token;
const passwordValue = resolvedAuth.password;
const authHints: string[] = [];
if (miskeys.hasGatewayToken) {
authHints.push(
'Found "gateway.token" in config. Use "gateway.auth.token" instead.',
);
}
if (miskeys.hasRemoteToken) {
authHints.push(
'"gateway.remote.token" is for remote CLI calls; it does not enable local gateway auth.',
);
}
if (resolvedAuthMode === "token" && !tokenValue) {
defaultRuntime.error(
[
"Gateway auth is set to token, but no token is configured.",
"Set gateway.auth.token (or CLAWDBOT_GATEWAY_TOKEN), or pass --token.",
...authHints,
]
.filter(Boolean)
.join("\n"),
);
defaultRuntime.exit(1);
return;
}
if (resolvedAuthMode === "password" && !passwordValue) {
defaultRuntime.error(
[
"Gateway auth is set to password, but no password is configured.",
"Set gateway.auth.password (or CLAWDBOT_GATEWAY_PASSWORD), or pass --password.",
...authHints,
]
.filter(Boolean)
.join("\n"),
);
defaultRuntime.exit(1);
return;
}
if (bind !== "loopback" && resolvedAuthMode === "none") {
defaultRuntime.error(
[
`Refusing to bind gateway to ${bind} without auth.`,
"Set gateway.auth.token (or CLAWDBOT_GATEWAY_TOKEN) or pass --token.",
...authHints,
]
.filter(Boolean)
.join("\n"),
);
defaultRuntime.exit(1);
return;
}
try {
await runGatewayLoop({
runtime: defaultRuntime,
start: async () =>
await startGatewayServer(port, {
bind,
auth:
authMode || opts.password || opts.token || authModeRaw
? {
mode: authMode ?? undefined,
token: opts.token ? String(opts.token) : undefined,
password: opts.password
? String(opts.password)
: undefined,
}
: undefined,
tailscale:
tailscaleMode || opts.tailscaleResetOnExit
? {
mode: tailscaleMode ?? undefined,
resetOnExit: Boolean(opts.tailscaleResetOnExit),
}
: undefined,
}),
});
} catch (err) {
if (
err instanceof GatewayLockError ||
(err &&
typeof err === "object" &&
(err as { name?: string }).name === "GatewayLockError")
) {
const errMessage = describeUnknownError(err);
defaultRuntime.error(
`Gateway failed to start: ${errMessage}\nIf the gateway is supervised, stop it with: clawdbot daemon stop`,
);
try {
const diagnostics = await inspectPortUsage(port);
if (diagnostics.status === "busy") {
for (const line of formatPortDiagnostics(diagnostics)) {
defaultRuntime.error(line);
}
}
} catch {
// ignore diagnostics failures
}
await maybeExplainGatewayServiceStop();
defaultRuntime.exit(1);
return;
}
defaultRuntime.error(`Gateway failed to start: ${String(err)}`);
defaultRuntime.exit(1);
}
await runGatewayCommand(opts, params);
});
}
export function registerGatewayCli(program: Command) {
const gateway = addGatewayRunCommand(
program.command("gateway").description("Run the WebSocket Gateway"),
);
// Back-compat: legacy launchd plists used gateway-daemon; keep hidden alias.
addGatewayRunCommand(
program
.command("gateway-daemon", { hidden: true })
.description("Run the WebSocket Gateway as a long-lived daemon"),
{ legacyTokenEnv: true },
);
gatewayCallOpts(
gateway

View File

@@ -266,7 +266,7 @@ describe("agentCommand", () => {
});
});
it("passes telegram account id when delivering", async () => {
it("passes through telegram accountId when delivering", async () => {
await withTempHome(async (home) => {
const store = path.join(home, "sessions.json");
mockConfig(home, store, undefined, undefined, { botToken: "t-1" });
@@ -297,7 +297,7 @@ describe("agentCommand", () => {
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
"123",
"ok",
expect.objectContaining({ accountId: "default", verbose: false }),
expect.objectContaining({ accountId: undefined, verbose: false }),
);
} finally {
if (prevTelegramToken === undefined) {

View File

@@ -15,6 +15,7 @@ import {
} from "../daemon/legacy.js";
import { resolveGatewayProgramArguments } from "../daemon/program-args.js";
import { resolveGatewayService } from "../daemon/service.js";
import { auditGatewayServiceConfig } from "../daemon/service-audit.js";
import type { RuntimeEnv } from "../runtime.js";
import {
DEFAULT_GATEWAY_DAEMON_RUNTIME,
@@ -23,6 +24,18 @@ import {
} from "./daemon-runtime.js";
import type { DoctorOptions, DoctorPrompter } from "./doctor-prompter.js";
function detectGatewayRuntime(
programArguments: string[] | undefined,
): GatewayDaemonRuntime {
const first = programArguments?.[0];
if (first) {
const base = path.basename(first).toLowerCase();
if (base === "bun" || base === "bun.exe") return "bun";
if (base === "node" || base === "node.exe") return "node";
}
return DEFAULT_GATEWAY_DAEMON_RUNTIME;
}
export async function maybeMigrateLegacyGatewayService(
cfg: ClawdbotConfig,
mode: "local" | "remote",
@@ -112,6 +125,83 @@ export async function maybeMigrateLegacyGatewayService(
});
}
export async function maybeRepairGatewayServiceConfig(
cfg: ClawdbotConfig,
mode: "local" | "remote",
runtime: RuntimeEnv,
prompter: DoctorPrompter,
) {
if (resolveIsNixMode(process.env)) {
note("Nix mode detected; skip service updates.", "Gateway");
return;
}
if (mode === "remote") {
note("Gateway mode is remote; skipped local service audit.", "Gateway");
return;
}
const service = resolveGatewayService();
const command = await service.readCommand(process.env).catch(() => null);
if (!command) return;
const audit = await auditGatewayServiceConfig({
env: process.env,
command,
});
if (audit.issues.length === 0) return;
note(
audit.issues
.map((issue) =>
issue.detail ? `- ${issue.message} (${issue.detail})` : `- ${issue.message}`,
)
.join("\n"),
"Gateway service config",
);
const repair = await prompter.confirmSkipInNonInteractive({
message: "Update gateway service config to the recommended defaults now?",
initialValue: true,
});
if (!repair) return;
const devMode =
process.argv[1]?.includes(`${path.sep}src${path.sep}`) &&
process.argv[1]?.endsWith(".ts");
const port = resolveGatewayPort(cfg, process.env);
const runtimeChoice = detectGatewayRuntime(command.programArguments);
const { programArguments, workingDirectory } =
await resolveGatewayProgramArguments({
port,
dev: devMode,
runtime: runtimeChoice,
});
const environment: Record<string, string | undefined> = {
PATH: process.env.PATH,
CLAWDBOT_PROFILE: process.env.CLAWDBOT_PROFILE,
CLAWDBOT_STATE_DIR: process.env.CLAWDBOT_STATE_DIR,
CLAWDBOT_CONFIG_PATH: process.env.CLAWDBOT_CONFIG_PATH,
CLAWDBOT_GATEWAY_PORT: String(port),
CLAWDBOT_GATEWAY_TOKEN:
cfg.gateway?.auth?.token ?? process.env.CLAWDBOT_GATEWAY_TOKEN,
CLAWDBOT_LAUNCHD_LABEL:
process.platform === "darwin" ? GATEWAY_LAUNCH_AGENT_LABEL : undefined,
};
try {
await service.install({
env: process.env,
stdout: process.stdout,
programArguments,
workingDirectory,
environment,
});
} catch (err) {
runtime.error(`Gateway service update failed: ${String(err)}`);
}
}
export async function maybeScanExtraGatewayServices(options: DoctorOptions) {
const extraServices = await findExtraGatewayServices(process.env, {
deep: options.deep,

View File

@@ -30,6 +30,7 @@ import {
} from "./doctor-format.js";
import {
maybeMigrateLegacyGatewayService,
maybeRepairGatewayServiceConfig,
maybeScanExtraGatewayServices,
} from "./doctor-gateway-services.js";
import {
@@ -157,6 +158,12 @@ export async function doctorCommand(
prompter,
);
await maybeScanExtraGatewayServices(options);
await maybeRepairGatewayServiceConfig(
cfg,
resolveMode(cfg),
runtime,
prompter,
);
await noteSecurityWarnings(cfg);

View File

@@ -137,7 +137,7 @@ describe("sendCommand", () => {
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
"123",
"hi",
expect.objectContaining({ accountId: "default", verbose: false }),
expect.objectContaining({ accountId: undefined, verbose: false }),
);
expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled();
});
@@ -158,7 +158,7 @@ describe("sendCommand", () => {
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
"123",
"hi",
expect.objectContaining({ accountId: "default", verbose: false }),
expect.objectContaining({ accountId: undefined, verbose: false }),
);
});
@@ -212,7 +212,7 @@ describe("sendCommand", () => {
expect(deps.sendMessageSlack).toHaveBeenCalledWith(
"channel:C123",
"hi",
expect.objectContaining({ accountId: "default" }),
expect.objectContaining({ accountId: undefined }),
);
expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled();
});

View File

@@ -716,4 +716,20 @@ describe("parseTelegramTarget", () => {
topicId: undefined,
});
});
it("strips internal telegram prefix", () => {
expect(parseTelegramTarget("telegram:123")).toEqual({
chatId: "123",
topicId: undefined,
});
});
it("strips internal telegram + group prefixes before parsing topic", () => {
expect(
parseTelegramTarget("telegram:group:-1001234567890:topic:456"),
).toEqual({
chatId: "-1001234567890",
topicId: 456,
});
});
});

View File

@@ -61,7 +61,16 @@ export function parseTelegramTarget(to: string): {
chatId: string;
topicId: number | undefined;
} {
const trimmed = to.trim();
let trimmed = to.trim();
// Cron "lastTo" values can include internal prefixes like `telegram:...` or
// `telegram:group:...` (see normalizeChatId in telegram/send.ts).
// Strip these before parsing `:topic:` / `:<topicId>` suffixes.
while (true) {
const next = trimmed.replace(/^(telegram|tg|group):/i, "").trim();
if (next === trimmed) break;
trimmed = next;
}
// Try format: chatId:topic:topicId
const topicMatch = /^(.+?):topic:(\d+)$/.exec(trimmed);

165
src/daemon/service-audit.ts Normal file
View File

@@ -0,0 +1,165 @@
import fs from "node:fs/promises";
import { resolveLaunchAgentPlistPath } from "./launchd.js";
import { resolveSystemdUserUnitPath } from "./systemd.js";
export type GatewayServiceCommand = {
programArguments: string[];
workingDirectory?: string;
environment?: Record<string, string>;
sourcePath?: string;
} | null;
export type ServiceConfigIssue = {
code: string;
message: string;
detail?: string;
};
export type ServiceConfigAudit = {
ok: boolean;
issues: ServiceConfigIssue[];
};
function hasGatewaySubcommand(programArguments?: string[]): boolean {
return Boolean(programArguments?.some((arg) => arg === "gateway"));
}
function parseSystemdUnit(content: string): {
after: Set<string>;
wants: Set<string>;
restartSec?: string;
} {
const after = new Set<string>();
const wants = new Set<string>();
let restartSec: string | undefined;
for (const rawLine of content.split(/\r?\n/)) {
const line = rawLine.trim();
if (!line) continue;
if (line.startsWith("#") || line.startsWith(";")) continue;
if (line.startsWith("[")) continue;
const idx = line.indexOf("=");
if (idx <= 0) continue;
const key = line.slice(0, idx).trim();
const value = line.slice(idx + 1).trim();
if (!value) continue;
if (key === "After") {
for (const entry of value.split(/\s+/)) {
if (entry) after.add(entry);
}
} else if (key === "Wants") {
for (const entry of value.split(/\s+/)) {
if (entry) wants.add(entry);
}
} else if (key === "RestartSec") {
restartSec = value;
}
}
return { after, wants, restartSec };
}
function isRestartSecPreferred(value: string | undefined): boolean {
if (!value) return false;
const parsed = Number.parseFloat(value);
if (!Number.isFinite(parsed)) return false;
return Math.abs(parsed - 5) < 0.01;
}
async function auditSystemdUnit(
env: Record<string, string | undefined>,
issues: ServiceConfigIssue[],
) {
const unitPath = resolveSystemdUserUnitPath(env);
let content = "";
try {
content = await fs.readFile(unitPath, "utf8");
} catch {
return;
}
const parsed = parseSystemdUnit(content);
if (!parsed.after.has("network-online.target")) {
issues.push({
code: "systemd-after-network-online",
message: "Missing systemd After=network-online.target",
detail: unitPath,
});
}
if (!parsed.wants.has("network-online.target")) {
issues.push({
code: "systemd-wants-network-online",
message: "Missing systemd Wants=network-online.target",
detail: unitPath,
});
}
if (!isRestartSecPreferred(parsed.restartSec)) {
issues.push({
code: "systemd-restart-sec",
message: "RestartSec does not match the recommended 5s",
detail: unitPath,
});
}
}
async function auditLaunchdPlist(
env: Record<string, string | undefined>,
issues: ServiceConfigIssue[],
) {
const plistPath = resolveLaunchAgentPlistPath(env);
let content = "";
try {
content = await fs.readFile(plistPath, "utf8");
} catch {
return;
}
const hasRunAtLoad = /<key>RunAtLoad<\/key>\s*<true\s*\/>/i.test(content);
const hasKeepAlive = /<key>KeepAlive<\/key>\s*<true\s*\/>/i.test(content);
if (!hasRunAtLoad) {
issues.push({
code: "launchd-run-at-load",
message: "LaunchAgent is missing RunAtLoad=true",
detail: plistPath,
});
}
if (!hasKeepAlive) {
issues.push({
code: "launchd-keep-alive",
message: "LaunchAgent is missing KeepAlive=true",
detail: plistPath,
});
}
}
function auditGatewayCommand(
programArguments: string[] | undefined,
issues: ServiceConfigIssue[],
) {
if (!programArguments || programArguments.length === 0) return;
if (!hasGatewaySubcommand(programArguments)) {
issues.push({
code: "gateway-command-missing",
message: "Service command does not include the gateway subcommand",
});
}
}
export async function auditGatewayServiceConfig(params: {
env: Record<string, string | undefined>;
command: GatewayServiceCommand;
platform?: NodeJS.Platform;
}): Promise<ServiceConfigAudit> {
const issues: ServiceConfigIssue[] = [];
const platform = params.platform ?? process.platform;
auditGatewayCommand(params.command?.programArguments, issues);
if (platform === "linux") {
await auditSystemdUnit(params.env, issues);
} else if (platform === "darwin") {
await auditLaunchdPlist(params.env, issues);
}
return { ok: issues.length === 0, issues };
}

View File

@@ -33,6 +33,12 @@ function resolveSystemdUnitPath(
return resolveSystemdUnitPathForName(env, GATEWAY_SYSTEMD_SERVICE_NAME);
}
export function resolveSystemdUserUnitPath(
env: Record<string, string | undefined>,
): string {
return resolveSystemdUnitPath(env);
}
function resolveLoginctlUser(
env: Record<string, string | undefined>,
): string | null {
@@ -141,10 +147,13 @@ function buildSystemdUnit({
return [
"[Unit]",
"Description=Clawdbot Gateway",
"After=network-online.target",
"Wants=network-online.target",
"",
"[Service]",
`ExecStart=${execStart}`,
"Restart=always",
"RestartSec=5",
workingDirLine,
...envLines,
"",

View File

@@ -0,0 +1,61 @@
import { EventEmitter } from "node:events";
import { describe, expect, it, vi } from "vitest";
import { waitForDiscordGatewayStop } from "./monitor.gateway.js";
describe("waitForDiscordGatewayStop", () => {
it("resolves on abort and disconnects gateway", async () => {
const emitter = new EventEmitter();
const disconnect = vi.fn();
const abort = new AbortController();
const promise = waitForDiscordGatewayStop({
gateway: { emitter, disconnect },
abortSignal: abort.signal,
});
expect(emitter.listenerCount("error")).toBe(1);
abort.abort();
await expect(promise).resolves.toBeUndefined();
expect(disconnect).toHaveBeenCalledTimes(1);
expect(emitter.listenerCount("error")).toBe(0);
});
it("rejects on gateway error and disconnects", async () => {
const emitter = new EventEmitter();
const disconnect = vi.fn();
const onGatewayError = vi.fn();
const abort = new AbortController();
const err = new Error("boom");
const promise = waitForDiscordGatewayStop({
gateway: { emitter, disconnect },
abortSignal: abort.signal,
onGatewayError,
});
emitter.emit("error", err);
await expect(promise).rejects.toThrow("boom");
expect(onGatewayError).toHaveBeenCalledWith(err);
expect(disconnect).toHaveBeenCalledTimes(1);
expect(emitter.listenerCount("error")).toBe(0);
abort.abort();
expect(disconnect).toHaveBeenCalledTimes(1);
});
it("resolves on abort without a gateway", async () => {
const abort = new AbortController();
const promise = waitForDiscordGatewayStop({
abortSignal: abort.signal,
});
abort.abort();
await expect(promise).resolves.toBeUndefined();
});
});

View File

@@ -0,0 +1,63 @@
import type { EventEmitter } from "node:events";
export type DiscordGatewayHandle = {
emitter?: Pick<EventEmitter, "on" | "removeListener">;
disconnect?: () => void;
};
export function getDiscordGatewayEmitter(
gateway?: unknown,
): EventEmitter | undefined {
return (gateway as { emitter?: EventEmitter } | undefined)?.emitter;
}
export async function waitForDiscordGatewayStop(params: {
gateway?: DiscordGatewayHandle;
abortSignal?: AbortSignal;
onGatewayError?: (err: unknown) => void;
}): Promise<void> {
const { gateway, abortSignal, onGatewayError } = params;
const emitter = gateway?.emitter;
return await new Promise<void>((resolve, reject) => {
let settled = false;
const cleanup = () => {
abortSignal?.removeEventListener("abort", onAbort);
emitter?.removeListener("error", onGatewayErrorEvent);
};
const finishResolve = () => {
if (settled) return;
settled = true;
cleanup();
try {
gateway?.disconnect?.();
} finally {
resolve();
}
};
const finishReject = (err: unknown) => {
if (settled) return;
settled = true;
cleanup();
try {
gateway?.disconnect?.();
} finally {
reject(err);
}
};
const onAbort = () => {
finishResolve();
};
const onGatewayErrorEvent = (err: unknown) => {
onGatewayError?.(err);
finishReject(err);
};
if (abortSignal?.aborted) {
onAbort();
return;
}
abortSignal?.addEventListener("abort", onAbort, { once: true });
emitter?.on("error", onGatewayErrorEvent);
});
}

View File

@@ -61,6 +61,10 @@ import type { RuntimeEnv } from "../runtime.js";
import { loadWebMedia } from "../web/media.js";
import { resolveDiscordAccount } from "./accounts.js";
import { chunkDiscordText } from "./chunk.js";
import {
getDiscordGatewayEmitter,
waitForDiscordGatewayStop,
} from "./monitor.gateway.js";
import { fetchDiscordApplicationId } from "./probe.js";
import { reactMessageDiscord, sendMessageDiscord } from "./send.js";
import { normalizeDiscordToken } from "./token.js";
@@ -402,18 +406,19 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`);
await new Promise<void>((resolve) => {
const onAbort = async () => {
try {
const gateway = client.getPlugin<GatewayPlugin>("gateway");
gateway?.disconnect();
} finally {
resolve();
}
};
opts.abortSignal?.addEventListener("abort", () => {
void onAbort();
});
const gateway = client.getPlugin<GatewayPlugin>("gateway");
const gatewayEmitter = getDiscordGatewayEmitter(gateway);
await waitForDiscordGatewayStop({
gateway: gateway
? {
emitter: gatewayEmitter,
disconnect: () => gateway.disconnect(),
}
: undefined,
abortSignal: opts.abortSignal,
onGatewayError: (err) => {
runtime.error?.(danger(`discord gateway error: ${String(err)}`));
},
});
}

View File

@@ -368,7 +368,7 @@ describe("runHeartbeatOnce", () => {
}
});
it("passes telegram token from config to sendTelegram", async () => {
it("passes through accountId for telegram heartbeats", async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-"));
const storePath = path.join(tmpDir, "sessions.json");
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
@@ -418,7 +418,74 @@ describe("runHeartbeatOnce", () => {
expect(sendTelegram).toHaveBeenCalledWith(
"123456",
"Hello from heartbeat",
expect.objectContaining({ accountId: "default", verbose: false }),
expect.objectContaining({ accountId: undefined, verbose: false }),
);
} finally {
replySpy.mockRestore();
if (prevTelegramToken === undefined) {
delete process.env.TELEGRAM_BOT_TOKEN;
} else {
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
}
await fs.rm(tmpDir, { recursive: true, force: true });
}
});
it("does not pre-resolve telegram accountId (allows config-only account tokens)", async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-"));
const storePath = path.join(tmpDir, "sessions.json");
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "";
try {
await fs.writeFile(
storePath,
JSON.stringify(
{
main: {
sessionId: "sid",
updatedAt: Date.now(),
lastProvider: "telegram",
lastTo: "123456",
},
},
null,
2,
),
);
const cfg: ClawdbotConfig = {
agent: {
heartbeat: { every: "5m", target: "telegram", to: "123456" },
},
telegram: {
accounts: {
work: { botToken: "test-bot-token-123" },
},
},
session: { store: storePath },
};
replySpy.mockResolvedValue({ text: "Hello from heartbeat" });
const sendTelegram = vi.fn().mockResolvedValue({
messageId: "m1",
chatId: "123456",
});
await runHeartbeatOnce({
cfg,
deps: {
sendTelegram,
getQueueSize: () => 0,
nowMs: () => 0,
},
});
expect(sendTelegram).toHaveBeenCalledTimes(1);
expect(sendTelegram).toHaveBeenCalledWith(
"123456",
"Hello from heartbeat",
expect.objectContaining({ accountId: undefined, verbose: false }),
);
} finally {
replySpy.mockRestore();

View File

@@ -7,7 +7,7 @@ import {
} from "./deliver.js";
describe("deliverOutboundPayloads", () => {
it("chunks telegram markdown and passes account id", async () => {
it("chunks telegram markdown and passes through accountId", async () => {
const sendTelegram = vi
.fn()
.mockResolvedValue({ messageId: "m1", chatId: "c1" });
@@ -28,7 +28,7 @@ describe("deliverOutboundPayloads", () => {
expect(sendTelegram).toHaveBeenCalledTimes(2);
for (const call of sendTelegram.mock.calls) {
expect(call[2]).toEqual(
expect.objectContaining({ accountId: "default", verbose: false }),
expect.objectContaining({ accountId: undefined, verbose: false }),
);
}
expect(results).toHaveLength(2);
@@ -42,6 +42,30 @@ describe("deliverOutboundPayloads", () => {
}
});
it("passes explicit accountId to sendTelegram", async () => {
const sendTelegram = vi
.fn()
.mockResolvedValue({ messageId: "m1", chatId: "c1" });
const cfg: ClawdbotConfig = {
telegram: { botToken: "tok-1", textChunkLimit: 2 },
};
await deliverOutboundPayloads({
cfg,
provider: "telegram",
to: "123",
accountId: "default",
payloads: [{ text: "hi" }],
deps: { sendTelegram },
});
expect(sendTelegram).toHaveBeenCalledWith(
"123",
"hi",
expect.objectContaining({ accountId: "default", verbose: false }),
);
});
it("uses signal media maxBytes from config", async () => {
const sendSignal = vi
.fn()

View File

@@ -86,7 +86,8 @@ function createProviderHandler(params: {
deps: Required<OutboundSendDeps>;
}): ProviderHandler {
const { cfg, to, deps } = params;
const accountId = normalizeAccountId(params.accountId);
const rawAccountId = params.accountId;
const accountId = normalizeAccountId(rawAccountId);
const signalMaxBytes =
params.provider === "signal"
? resolveMediaMaxBytes(cfg, "signal", accountId)
@@ -103,7 +104,7 @@ function createProviderHandler(params: {
provider: "whatsapp",
...(await deps.sendWhatsApp(to, text, {
verbose: false,
accountId,
accountId: rawAccountId,
})),
}),
sendMedia: async (caption, mediaUrl) => ({
@@ -111,7 +112,7 @@ function createProviderHandler(params: {
...(await deps.sendWhatsApp(to, caption, {
verbose: false,
mediaUrl,
accountId,
accountId: rawAccountId,
})),
}),
},
@@ -121,7 +122,7 @@ function createProviderHandler(params: {
provider: "telegram",
...(await deps.sendTelegram(to, text, {
verbose: false,
accountId,
accountId: rawAccountId,
})),
}),
sendMedia: async (caption, mediaUrl) => ({
@@ -129,7 +130,7 @@ function createProviderHandler(params: {
...(await deps.sendTelegram(to, caption, {
verbose: false,
mediaUrl,
accountId,
accountId: rawAccountId,
})),
}),
},
@@ -139,7 +140,7 @@ function createProviderHandler(params: {
provider: "discord",
...(await deps.sendDiscord(to, text, {
verbose: false,
accountId,
accountId: rawAccountId,
})),
}),
sendMedia: async (caption, mediaUrl) => ({
@@ -147,7 +148,7 @@ function createProviderHandler(params: {
...(await deps.sendDiscord(to, caption, {
verbose: false,
mediaUrl,
accountId,
accountId: rawAccountId,
})),
}),
},
@@ -156,14 +157,14 @@ function createProviderHandler(params: {
sendText: async (text) => ({
provider: "slack",
...(await deps.sendSlack(to, text, {
accountId,
accountId: rawAccountId,
})),
}),
sendMedia: async (caption, mediaUrl) => ({
provider: "slack",
...(await deps.sendSlack(to, caption, {
mediaUrl,
accountId,
accountId: rawAccountId,
})),
}),
},
@@ -173,7 +174,7 @@ function createProviderHandler(params: {
provider: "signal",
...(await deps.sendSignal(to, text, {
maxBytes: signalMaxBytes,
accountId,
accountId: rawAccountId,
})),
}),
sendMedia: async (caption, mediaUrl) => ({
@@ -181,7 +182,7 @@ function createProviderHandler(params: {
...(await deps.sendSignal(to, caption, {
mediaUrl,
maxBytes: signalMaxBytes,
accountId,
accountId: rawAccountId,
})),
}),
},
@@ -191,7 +192,7 @@ function createProviderHandler(params: {
provider: "imessage",
...(await deps.sendIMessage(to, text, {
maxBytes: imessageMaxBytes,
accountId,
accountId: rawAccountId,
})),
}),
sendMedia: async (caption, mediaUrl) => ({
@@ -199,7 +200,7 @@ function createProviderHandler(params: {
...(await deps.sendIMessage(to, caption, {
mediaUrl,
maxBytes: imessageMaxBytes,
accountId,
accountId: rawAccountId,
})),
}),
},
@@ -220,7 +221,7 @@ export async function deliverOutboundPayloads(params: {
onPayload?: (payload: NormalizedOutboundPayload) => void;
}): Promise<OutboundDeliveryResult[]> {
const { cfg, provider, to, payloads } = params;
const accountId = normalizeAccountId(params.accountId);
const accountId = params.accountId;
const deps = {
sendWhatsApp: params.deps?.sendWhatsApp ?? sendMessageWhatsApp,
sendTelegram: params.deps?.sendTelegram ?? sendMessageTelegram,

View File

@@ -0,0 +1,69 @@
import { describe, expect, it } from "vitest";
import type { ClawdbotConfig } from "../config/config.js";
import { resolveTelegramAccount } from "./accounts.js";
describe("resolveTelegramAccount", () => {
it("falls back to the first configured account when accountId is omitted", () => {
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "";
try {
const cfg: ClawdbotConfig = {
telegram: { accounts: { work: { botToken: "tok-work" } } },
};
const account = resolveTelegramAccount({ cfg });
expect(account.accountId).toBe("work");
expect(account.token).toBe("tok-work");
expect(account.tokenSource).toBe("config");
} finally {
if (prevTelegramToken === undefined) {
delete process.env.TELEGRAM_BOT_TOKEN;
} else {
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
}
}
});
it("prefers TELEGRAM_BOT_TOKEN when accountId is omitted", () => {
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "tok-env";
try {
const cfg: ClawdbotConfig = {
telegram: { accounts: { work: { botToken: "tok-work" } } },
};
const account = resolveTelegramAccount({ cfg });
expect(account.accountId).toBe("default");
expect(account.token).toBe("tok-env");
expect(account.tokenSource).toBe("env");
} finally {
if (prevTelegramToken === undefined) {
delete process.env.TELEGRAM_BOT_TOKEN;
} else {
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
}
}
});
it("does not fall back when accountId is explicitly provided", () => {
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "";
try {
const cfg: ClawdbotConfig = {
telegram: { accounts: { work: { botToken: "tok-work" } } },
};
const account = resolveTelegramAccount({ cfg, accountId: "default" });
expect(account.accountId).toBe("default");
expect(account.tokenSource).toBe("none");
expect(account.token).toBe("");
} finally {
if (prevTelegramToken === undefined) {
delete process.env.TELEGRAM_BOT_TOKEN;
} else {
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
}
}
});
});

View File

@@ -56,20 +56,37 @@ export function resolveTelegramAccount(params: {
cfg: ClawdbotConfig;
accountId?: string | null;
}): ResolvedTelegramAccount {
const accountId = normalizeAccountId(params.accountId);
const hasExplicitAccountId = Boolean(params.accountId?.trim());
const baseEnabled = params.cfg.telegram?.enabled !== false;
const merged = mergeTelegramAccountConfig(params.cfg, accountId);
const accountEnabled = merged.enabled !== false;
const enabled = baseEnabled && accountEnabled;
const tokenResolution = resolveTelegramToken(params.cfg, { accountId });
return {
accountId,
enabled,
name: merged.name?.trim() || undefined,
token: tokenResolution.token,
tokenSource: tokenResolution.source,
config: merged,
const resolve = (accountId: string) => {
const merged = mergeTelegramAccountConfig(params.cfg, accountId);
const accountEnabled = merged.enabled !== false;
const enabled = baseEnabled && accountEnabled;
const tokenResolution = resolveTelegramToken(params.cfg, { accountId });
return {
accountId,
enabled,
name: merged.name?.trim() || undefined,
token: tokenResolution.token,
tokenSource: tokenResolution.source,
config: merged,
} satisfies ResolvedTelegramAccount;
};
const normalized = normalizeAccountId(params.accountId);
const primary = resolve(normalized);
if (hasExplicitAccountId) return primary;
if (primary.tokenSource !== "none") return primary;
// If accountId is omitted, prefer a configured account token over failing on
// the implicit "default" account. This keeps env-based setups working (env
// still wins) while making config-only tokens work for things like heartbeats.
const fallbackId = resolveDefaultTelegramAccountId(params.cfg);
if (fallbackId === primary.accountId) return primary;
const fallback = resolve(fallbackId);
if (fallback.tokenSource === "none") return primary;
return fallback;
}
export function listEnabledTelegramAccounts(

View File

@@ -1,4 +1,5 @@
import type { ReactionType, ReactionTypeEmoji } from "@grammyjs/types";
import type { ApiClientOptions } from "grammy";
import { Bot, InputFile } from "grammy";
import { loadConfig } from "../config/config.js";
import { formatErrorMessage } from "../infra/errors.js";
@@ -113,10 +114,10 @@ export async function sendMessageTelegram(
// Use provided api or create a new Bot instance. The nullish coalescing
// operator ensures api is always defined (Bot.api is always non-null).
const fetchImpl = resolveTelegramFetch();
const api =
opts.api ??
new Bot(token, fetchImpl ? { client: { fetch: fetchImpl } } : undefined)
.api;
const client: ApiClientOptions | undefined = fetchImpl
? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] }
: undefined;
const api = opts.api ?? new Bot(token, client ? { client } : undefined).api;
const mediaUrl = opts.mediaUrl?.trim();
// Build optional params for forum topics and reply threading.
@@ -271,10 +272,10 @@ export async function reactMessageTelegram(
const chatId = normalizeChatId(String(chatIdInput));
const messageId = normalizeMessageId(messageIdInput);
const fetchImpl = resolveTelegramFetch();
const api =
opts.api ??
new Bot(token, fetchImpl ? { client: { fetch: fetchImpl } } : undefined)
.api;
const client: ApiClientOptions | undefined = fetchImpl
? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] }
: undefined;
const api = opts.api ?? new Bot(token, client ? { client } : undefined).api;
const request = createTelegramRetryRunner({
retry: opts.retry,
configRetry: account.config.retry,

View File

@@ -1,3 +1,4 @@
import type { ApiClientOptions } from "grammy";
import { Bot } from "grammy";
import { resolveTelegramFetch } from "./fetch.js";
@@ -8,10 +9,10 @@ export async function setTelegramWebhook(opts: {
dropPendingUpdates?: boolean;
}) {
const fetchImpl = resolveTelegramFetch();
const bot = new Bot(
opts.token,
fetchImpl ? { client: { fetch: fetchImpl } } : undefined,
);
const client: ApiClientOptions | undefined = fetchImpl
? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] }
: undefined;
const bot = new Bot(opts.token, client ? { client } : undefined);
await bot.api.setWebhook(opts.url, {
secret_token: opts.secret,
drop_pending_updates: opts.dropPendingUpdates ?? false,
@@ -20,9 +21,9 @@ export async function setTelegramWebhook(opts: {
export async function deleteTelegramWebhook(opts: { token: string }) {
const fetchImpl = resolveTelegramFetch();
const bot = new Bot(
opts.token,
fetchImpl ? { client: { fetch: fetchImpl } } : undefined,
);
const client: ApiClientOptions | undefined = fetchImpl
? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] }
: undefined;
const bot = new Bot(opts.token, client ? { client } : undefined);
await bot.api.deleteWebhook();
}