From c1ccbd58f57c5815b1c1f45befb3a09c006018e3 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 1 Jan 2026 15:31:41 +0000 Subject: [PATCH] fix(signal): stabilize daemon + add signal delivery --- CHANGELOG.md | 1 + src/commands/health.snapshot.test.ts | 2 + src/config/config.ts | 27 +++++++------ src/cron/isolated-agent.ts | 43 +++++++++++++++++++- src/cron/types.ts | 2 +- src/infra/heartbeat-runner.ts | 37 ++++++++++++++--- src/signal/monitor.ts | 60 +++++++++++++++++++++------- src/signal/probe.test.ts | 46 +++++++++++++++++++++ src/signal/probe.ts | 13 +++++- src/signal/send.ts | 5 ++- 10 files changed, 199 insertions(+), 37 deletions(-) create mode 100644 src/signal/probe.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c5329d27..6a460df6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ ### Fixes - Docs/agent tools: clarify that browser `wait` should be avoided by default and used only in exceptional cases. - Browser tools: `upload` supports auto-click refs, direct `inputRef`/`element` file inputs, and emits input/change after `setFiles` so JS-heavy sites pick up attachments. +- Signal: fix daemon startup race (wait for `/api/v1/check`) and normalize JSON-RPC `version` probe parsing. - macOS: Voice Wake now fully tears down the Speech pipeline when disabled (cancel pending restarts, drop stale callbacks) to avoid high CPU in the background. - macOS menu: add a Talk Mode action alongside the Open Dashboard/Chat/Canvas entries. - macOS Debug: hide “Restart Gateway” when the app won’t start a local gateway (remote mode / attach-only). diff --git a/src/commands/health.snapshot.test.ts b/src/commands/health.snapshot.test.ts index cdeb5a505..3a7b7fb4b 100644 --- a/src/commands/health.snapshot.test.ts +++ b/src/commands/health.snapshot.test.ts @@ -109,6 +109,8 @@ describe("getHealthSnapshot", () => { fs.writeFileSync(tokenFile, "t-file\n", "utf-8"); testConfig = { telegram: { tokenFile } }; testStore = {}; + vi.stubEnv("TELEGRAM_BOT_TOKEN", ""); + vi.stubEnv("DISCORD_BOT_TOKEN", ""); const calls: string[] = []; vi.stubGlobal( diff --git a/src/config/config.ts b/src/config/config.ts index e2806234a..689cfaef9 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -456,7 +456,6 @@ export type ClawdisConfig = { canvasHost?: CanvasHostConfig; talk?: TalkConfig; gateway?: GatewayConfig; - skills?: SkillsConfig; }; /** @@ -851,7 +850,9 @@ const ClawdisSchema = z.object({ httpPort: z.number().int().positive().optional(), cliPath: z.string().optional(), autoStart: z.boolean().optional(), - receiveMode: z.union([z.literal("on-start"), z.literal("manual")]).optional(), + receiveMode: z + .union([z.literal("on-start"), z.literal("manual")]) + .optional(), ignoreAttachments: z.boolean().optional(), ignoreStories: z.boolean().optional(), sendReadReceipts: z.boolean().optional(), @@ -948,16 +949,18 @@ const ClawdisSchema = z.object({ .optional(), }) .optional(), - entries: z.record( - z.string(), - z - .object({ - enabled: z.boolean().optional(), - apiKey: z.string().optional(), - env: z.record(z.string(), z.string()).optional(), - }) - .passthrough(), - ).optional(), + entries: z + .record( + z.string(), + z + .object({ + enabled: z.boolean().optional(), + apiKey: z.string().optional(), + env: z.record(z.string(), z.string()).optional(), + }) + .passthrough(), + ) + .optional(), }) .optional(), }); diff --git a/src/cron/isolated-agent.ts b/src/cron/isolated-agent.ts index 3e2fa0d6a..0c34be543 100644 --- a/src/cron/isolated-agent.ts +++ b/src/cron/isolated-agent.ts @@ -53,7 +53,7 @@ function pickSummaryFromPayloads( function resolveDeliveryTarget( cfg: ClawdisConfig, jobPayload: { - channel?: "last" | "whatsapp" | "telegram" | "discord"; + channel?: "last" | "whatsapp" | "telegram" | "discord" | "signal"; to?: string; }, ) { @@ -79,7 +79,8 @@ function resolveDeliveryTarget( if ( requestedChannel === "whatsapp" || requestedChannel === "telegram" || - requestedChannel === "discord" + requestedChannel === "discord" || + requestedChannel === "signal" ) { return requestedChannel; } @@ -414,6 +415,44 @@ export async function runCronIsolatedAgentTurn(params: { return { status: "error", summary, error: String(err) }; return { status: "ok", summary }; } + } else if (resolvedDelivery.channel === "signal") { + if (!resolvedDelivery.to) { + if (!bestEffortDeliver) + return { + status: "error", + summary, + error: "Cron delivery to Signal requires a recipient.", + }; + return { + status: "skipped", + summary: "Delivery skipped (no Signal recipient).", + }; + } + const to = resolvedDelivery.to; + try { + for (const payload of payloads) { + const mediaList = + payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); + if (mediaList.length === 0) { + for (const chunk of chunkText(payload.text ?? "", 4000)) { + await params.deps.sendMessageSignal(to, chunk); + } + } else { + let first = true; + for (const url of mediaList) { + const caption = first ? (payload.text ?? "") : ""; + first = false; + await params.deps.sendMessageSignal(to, caption, { + mediaUrl: url, + }); + } + } + } + } catch (err) { + if (!bestEffortDeliver) + return { status: "error", summary, error: String(err) }; + return { status: "ok", summary }; + } } } diff --git a/src/cron/types.ts b/src/cron/types.ts index c02d1d183..d3bfa44aa 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -14,7 +14,7 @@ export type CronPayload = thinking?: string; timeoutSeconds?: number; deliver?: boolean; - channel?: "last" | "whatsapp" | "telegram" | "discord"; + channel?: "last" | "whatsapp" | "telegram" | "discord" | "signal"; to?: string; bestEffortDeliver?: boolean; }; diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 9440df2e3..5473d92ed 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -20,6 +20,7 @@ import { createSubsystemLogger } from "../logging.js"; import { getQueueSize } from "../process/command-queue.js"; import { webAuthExists } from "../providers/web/index.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import { sendMessageSignal } from "../signal/send.js"; import { sendMessageTelegram } from "../telegram/send.js"; import { normalizeE164 } from "../utils.js"; import { getActiveWebListener } from "../web/active-listener.js"; @@ -36,10 +37,11 @@ export type HeartbeatTarget = | "whatsapp" | "telegram" | "discord" + | "signal" | "none"; export type HeartbeatDeliveryTarget = { - channel: "whatsapp" | "telegram" | "discord" | "none"; + channel: "whatsapp" | "telegram" | "discord" | "signal" | "none"; to?: string; reason?: string; }; @@ -49,6 +51,7 @@ type HeartbeatDeps = { sendWhatsApp?: typeof sendMessageWhatsApp; sendTelegram?: typeof sendMessageTelegram; sendDiscord?: typeof sendMessageDiscord; + sendSignal?: typeof sendMessageSignal; getQueueSize?: (lane?: string) => number; nowMs?: () => number; webAuthExists?: () => Promise; @@ -177,6 +180,7 @@ export function resolveHeartbeatDeliveryTarget(params: { rawTarget === "whatsapp" || rawTarget === "telegram" || rawTarget === "discord" || + rawTarget === "signal" || rawTarget === "none" || rawTarget === "last" ? rawTarget @@ -197,10 +201,13 @@ export function resolveHeartbeatDeliveryTarget(params: { : undefined; const lastTo = typeof entry?.lastTo === "string" ? entry.lastTo.trim() : ""; - const channel: "whatsapp" | "telegram" | "discord" | undefined = + const channel: "whatsapp" | "telegram" | "discord" | "signal" | undefined = target === "last" ? lastChannel - : target === "whatsapp" || target === "telegram" || target === "discord" + : target === "whatsapp" || + target === "telegram" || + target === "discord" || + target === "signal" ? target : undefined; @@ -267,12 +274,15 @@ function normalizeHeartbeatReply( } async function deliverHeartbeatReply(params: { - channel: "whatsapp" | "telegram" | "discord"; + channel: "whatsapp" | "telegram" | "discord" | "signal"; to: string; text: string; mediaUrls: string[]; deps: Required< - Pick + Pick< + HeartbeatDeps, + "sendWhatsApp" | "sendTelegram" | "sendDiscord" | "sendSignal" + > >; }) { const { channel, to, text, mediaUrls, deps } = params; @@ -292,6 +302,22 @@ async function deliverHeartbeatReply(params: { return; } + if (channel === "signal") { + if (mediaUrls.length === 0) { + for (const chunk of chunkText(text, 4000)) { + await deps.sendSignal(to, chunk); + } + return; + } + let first = true; + for (const url of mediaUrls) { + const caption = first ? text : ""; + first = false; + await deps.sendSignal(to, caption, { mediaUrl: url }); + } + return; + } + if (channel === "telegram") { if (mediaUrls.length === 0) { for (const chunk of chunkText(text, 4000)) { @@ -437,6 +463,7 @@ export async function runHeartbeatOnce(opts: { sendWhatsApp: opts.deps?.sendWhatsApp ?? sendMessageWhatsApp, sendTelegram: opts.deps?.sendTelegram ?? sendMessageTelegram, sendDiscord: opts.deps?.sendDiscord ?? sendMessageDiscord, + sendSignal: opts.deps?.sendSignal ?? sendMessageSignal, }; await deliverHeartbeatReply({ channel: delivery.channel, diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index 80b3c3300..a307f22eb 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -1,15 +1,15 @@ +import { chunkText } from "../auto-reply/chunk.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { loadConfig } from "../config/config.js"; import { resolveStorePath, updateLastRoute } from "../config/sessions.js"; -import { chunkText } from "../auto-reply/chunk.js"; import { danger, isVerbose, logVerbose } from "../globals.js"; import { mediaKindFromMime } from "../media/constants.js"; import { saveMediaBuffer } from "../media/store.js"; import type { RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; -import { signalRpcRequest, streamSignalEvents } from "./client.js"; +import { signalCheck, signalRpcRequest, streamSignalEvents } from "./client.js"; import { spawnSignalDaemon } from "./daemon.js"; import { sendMessageSignal } from "./send.js"; @@ -93,10 +93,7 @@ function resolveAccount(opts: MonitorSignalOpts): string | undefined { function resolveAllowFrom(opts: MonitorSignalOpts): string[] { const cfg = loadConfig(); const raw = - opts.allowFrom ?? - cfg.signal?.allowFrom ?? - cfg.routing?.allowFrom ?? - []; + opts.allowFrom ?? cfg.signal?.allowFrom ?? cfg.routing?.allowFrom ?? []; return raw.map((entry) => String(entry).trim()).filter(Boolean); } @@ -110,6 +107,32 @@ function isAllowedSender(sender: string, allowFrom: string[]): boolean { return normalizedAllow.includes(normalizedSender); } +async function waitForSignalDaemonReady(params: { + baseUrl: string; + abortSignal?: AbortSignal; + timeoutMs: number; + runtime: RuntimeEnv; +}): Promise { + const started = Date.now(); + let lastError: string | null = null; + + while (Date.now() - started < params.timeoutMs) { + if (params.abortSignal?.aborted) return; + const res = await signalCheck(params.baseUrl, 1000); + if (res.ok) return; + lastError = + res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable"); + await new Promise((r) => setTimeout(r, 150)); + } + + params.runtime.error?.( + danger( + `signal: daemon not ready after ${params.timeoutMs}ms (${lastError ?? "unknown error"})`, + ), + ); + throw new Error(`signal daemon not ready (${lastError ?? "unknown error"})`); +} + async function fetchAttachment(params: { baseUrl: string; account?: string; @@ -202,9 +225,7 @@ export async function monitorSignalProvider( opts.ignoreAttachments ?? cfg.signal?.ignoreAttachments ?? false; const autoStart = - opts.autoStart ?? - cfg.signal?.autoStart ?? - (cfg.signal?.httpUrl ? false : true); + opts.autoStart ?? cfg.signal?.autoStart ?? !cfg.signal?.httpUrl; let daemonHandle: ReturnType | null = null; if (autoStart) { @@ -220,8 +241,7 @@ export async function monitorSignalProvider( ignoreAttachments: opts.ignoreAttachments ?? cfg.signal?.ignoreAttachments, ignoreStories: opts.ignoreStories ?? cfg.signal?.ignoreStories, - sendReadReceipts: - opts.sendReadReceipts ?? cfg.signal?.sendReadReceipts, + sendReadReceipts: opts.sendReadReceipts ?? cfg.signal?.sendReadReceipts, runtime, }); } @@ -232,6 +252,15 @@ export async function monitorSignalProvider( opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); try { + if (daemonHandle) { + await waitForSignalDaemonReady({ + baseUrl, + abortSignal: opts.abortSignal, + timeoutMs: 10_000, + runtime, + }); + } + const handleEvent = async (event: { event?: string; data?: string }) => { if (event.event !== "receive" || !event.data) return; let payload: SignalReceivePayload | null = null; @@ -242,7 +271,9 @@ export async function monitorSignalProvider( return; } if (payload?.exception?.message) { - runtime.error?.(`signal: receive exception: ${payload.exception.message}`); + runtime.error?.( + `signal: receive exception: ${payload.exception.message}`, + ); } const envelope = payload?.envelope; if (!envelope) return; @@ -282,7 +313,8 @@ export async function monitorSignalProvider( }); if (fetched) { mediaPath = fetched.path; - mediaType = fetched.contentType ?? firstAttachment.contentType ?? undefined; + mediaType = + fetched.contentType ?? firstAttachment.contentType ?? undefined; } } catch (err) { runtime.error?.( @@ -317,7 +349,7 @@ export async function monitorSignalProvider( From: isGroup ? `group:${groupId}` : `signal:${sender}`, To: isGroup ? `group:${groupId}` : `signal:${sender}`, ChatType: isGroup ? "group" : "direct", - GroupSubject: isGroup ? groupName ?? undefined : undefined, + GroupSubject: isGroup ? (groupName ?? undefined) : undefined, SenderName: envelope.sourceName ?? sender, Surface: "signal" as const, MessageSid: envelope.timestamp ? String(envelope.timestamp) : undefined, diff --git a/src/signal/probe.test.ts b/src/signal/probe.test.ts new file mode 100644 index 000000000..5a0c699d0 --- /dev/null +++ b/src/signal/probe.test.ts @@ -0,0 +1,46 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { probeSignal } from "./probe.js"; + +const signalCheckMock = vi.fn(); +const signalRpcRequestMock = vi.fn(); + +vi.mock("./client.js", () => ({ + signalCheck: (...args: unknown[]) => signalCheckMock(...args), + signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args), +})); + +describe("probeSignal", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("extracts version from {version} result", async () => { + signalCheckMock.mockResolvedValueOnce({ + ok: true, + status: 200, + error: null, + }); + signalRpcRequestMock.mockResolvedValueOnce({ version: "0.13.22" }); + + const res = await probeSignal("http://127.0.0.1:8080", 1000); + + expect(res.ok).toBe(true); + expect(res.version).toBe("0.13.22"); + expect(res.status).toBe(200); + }); + + it("returns ok=false when /check fails", async () => { + signalCheckMock.mockResolvedValueOnce({ + ok: false, + status: 503, + error: "HTTP 503", + }); + + const res = await probeSignal("http://127.0.0.1:8080", 1000); + + expect(res.ok).toBe(false); + expect(res.status).toBe(503); + expect(res.version).toBe(null); + }); +}); diff --git a/src/signal/probe.ts b/src/signal/probe.ts index 302c0be1c..56ba17c4b 100644 --- a/src/signal/probe.ts +++ b/src/signal/probe.ts @@ -8,6 +8,15 @@ export type SignalProbe = { version?: string | null; }; +function parseSignalVersion(value: unknown): string | null { + if (typeof value === "string" && value.trim()) return value.trim(); + if (typeof value === "object" && value !== null) { + const version = (value as { version?: unknown }).version; + if (typeof version === "string" && version.trim()) return version.trim(); + } + return null; +} + export async function probeSignal( baseUrl: string, timeoutMs: number, @@ -30,11 +39,11 @@ export async function probeSignal( }; } try { - const version = await signalRpcRequest("version", undefined, { + const version = await signalRpcRequest("version", undefined, { baseUrl, timeoutMs, }); - result.version = typeof version === "string" ? version : null; + result.version = parseSignalVersion(version); } catch (err) { result.error = err instanceof Error ? err.message : String(err); } diff --git a/src/signal/send.ts b/src/signal/send.ts index f4e9b8bff..c82ae8ce0 100644 --- a/src/signal/send.ts +++ b/src/signal/send.ts @@ -50,7 +50,10 @@ function parseTarget(raw: string): SignalTarget { value = value.slice("signal:".length).trim(); } if (lower.startsWith("username:")) { - return { type: "username", username: value.slice("username:".length).trim() }; + return { + type: "username", + username: value.slice("username:".length).trim(), + }; } if (lower.startsWith("u:")) { return { type: "username", username: value.trim() };