From 596770942adbce15483cf6f6feac498edf1c04c2 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 1 Jan 2026 15:43:15 +0100 Subject: [PATCH] feat: add Signal provider support --- CHANGELOG.md | 1 + README.md | 2 +- docs/setup.md | 116 ++++++++++ docs/signal.md | 46 ++++ src/cli/cron-cli.ts | 4 +- src/cli/deps.ts | 3 + src/cli/program.ts | 14 +- src/commands/agent.ts | 42 ++++ src/commands/send.test.ts | 18 ++ src/commands/send.ts | 26 +++ src/config/config.ts | 49 ++++- src/config/sessions.ts | 2 +- src/gateway/server.test.ts | 63 ++++++ src/gateway/server.ts | 171 ++++++++++++++- src/infra/provider-summary.ts | 16 ++ src/signal/client.ts | 187 +++++++++++++++++ src/signal/daemon.ts | 68 ++++++ src/signal/index.ts | 3 + src/signal/monitor.ts | 384 ++++++++++++++++++++++++++++++++++ src/signal/probe.ts | 47 +++++ src/signal/send.ts | 125 +++++++++++ 21 files changed, 1368 insertions(+), 19 deletions(-) create mode 100644 docs/setup.md create mode 100644 docs/signal.md create mode 100644 src/signal/client.ts create mode 100644 src/signal/daemon.ts create mode 100644 src/signal/index.ts create mode 100644 src/signal/monitor.ts create mode 100644 src/signal/probe.ts create mode 100644 src/signal/send.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index e9cd78247..9c5329d27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - UI: add optional `ui.seamColor` accent to tint the Talk Mode side bubble (macOS/iOS/Android). - Nix mode: opt-in declarative config + read-only settings UI when `CLAWDIS_NIX_MODE=1` (thanks @joshp123 for the persistence — earned my trust; I'll merge these going forward). - Agent runtime: accept legacy `Z_AI_API_KEY` for Z.AI provider auth (maps to `ZAI_API_KEY`). +- Signal: add `signal-cli` JSON-RPC support for send/receive via the Signal provider. - Tests: add a Z.AI live test gate for smoke validation when keys are present. - macOS Debug: add app log verbosity and rolling file log toggle for swift-log-backed app logs. diff --git a/README.md b/README.md index 73960f343..3f0cc3af6 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ Your surfaces ## What Clawdis does - **Personal assistant** — one user, one identity, one memory surface. -- **Multi-surface inbox** — WhatsApp, Telegram, Discord, WebChat, macOS, iOS. +- **Multi-surface inbox** — WhatsApp, Telegram, Discord, WebChat, macOS, iOS. Signal support via `signal-cli` (see `docs/signal.md`). - **Voice wake + push-to-talk** — local speech recognition on macOS/iOS. - **Canvas** — a live visual workspace you can drive from the agent. - **Automation-ready** — browser control, media handling, and tool streaming. diff --git a/docs/setup.md b/docs/setup.md new file mode 100644 index 000000000..c2a95f70a --- /dev/null +++ b/docs/setup.md @@ -0,0 +1,116 @@ +--- +summary: "Setup guide: keep your Clawdis setup tailored while staying up-to-date" +read_when: + - Setting up a new machine + - You want “latest + greatest” without breaking your personal setup +--- + +# Setup + +Last updated: 2026-01-01 + +## TL;DR +- **Tailoring lives outside the repo:** `~/clawd` (workspace) + `~/.clawdis/clawdis.json` (config). +- **Stable workflow:** install the macOS app; let it run the bundled Gateway. +- **Bleeding edge workflow:** run the Gateway yourself via `pnpm gateway:watch`, then point the macOS app at it using **Debug Settings → Gateway → Attach only**. + +## Prereqs (from source) +- Node `>=22` +- `pnpm` + +## Tailoring strategy (so updates don’t hurt) + +If you want “100% tailored to me” *and* easy updates, keep your customization in: + +- **Config:** `~/.clawdis/clawdis.json` (JSON/JSON5-ish) +- **Workspace:** `~/clawd` (skills, prompts, memories; make it a private git repo) + +Bootstrap once: + +```bash +clawdis setup +``` + +From inside this repo, use the local CLI entry: + +```bash +pnpm clawdis setup +``` + +## Stable workflow (macOS app first) + +1) Install + launch **Clawdis.app** (menu bar). +2) Complete the onboarding/permissions checklist (TCC prompts). +3) Ensure Gateway is **Local** and running (the app manages it). +4) Link surfaces (example: WhatsApp): + +```bash +clawdis login +``` + +5) Sanity check: + +```bash +clawdis health +``` + +If onboarding is still WIP/broken on your build: +- Run `clawdis setup`, then `clawdis login`, then start the Gateway manually (`clawdis gateway`). + +## Bleeding edge workflow (Gateway in a terminal) + +Goal: work on the TypeScript Gateway, get hot reload, keep the macOS app UI attached. + +### 0) (Optional) Run the macOS app from source too + +If you also want the macOS app on the bleeding edge: + +```bash +./scripts/restart-mac.sh +``` + +### 1) Start the dev Gateway + +```bash +pnpm install +pnpm gateway:watch +``` + +`gateway:watch` runs `src/index.ts gateway --force` and reloads on `src/**/*.ts` changes. + +### 2) Point the macOS app at your running Gateway + +In **Clawdis.app**: + +- Connection Mode: **Local** +- Settings → **Debug Settings** → **Gateway** → enable **Attach only** + +This makes the app **only connect to an already-running gateway** and **never spawn** its own. + +### 3) Verify + +- In-app Gateway status should read **“Using existing gateway …”** +- Or via CLI: + +```bash +pnpm clawdis health +``` + +### Common footguns +- **Attach only enabled, but nothing is running:** app shows “Attach-only enabled; no gateway to attach”. +- **Wrong port:** Gateway WS defaults to `ws://127.0.0.1:18789`; keep app + CLI on the same port. +- **Where state lives:** + - Credentials: `~/.clawdis/credentials/` + - Sessions/logs: `~/.clawdis/sessions/` + +## Updating (without wrecking your setup) + +- Keep `~/clawd` and `~/.clawdis/` as “your stuff”; don’t put personal prompts/config into the `clawdis` repo. +- Updating source: `git pull` + `pnpm install` (when lockfile changed) + keep using `pnpm gateway:watch`. + +## Related docs + +- `docs/gateway.md` (Gateway runbook; flags, supervision, ports) +- `docs/configuration.md` (config schema + examples) +- `docs/clawd.md` (personal assistant setup) +- `docs/clawdis-mac.md` (macOS app behavior; gateway lifecycle + “Attach only”) diff --git a/docs/signal.md b/docs/signal.md new file mode 100644 index 000000000..243881667 --- /dev/null +++ b/docs/signal.md @@ -0,0 +1,46 @@ +# Signal (signal-cli) + +Status: external CLI integration only. No libsignal embedding. + +## Why +- Signal OSS stack is GPL/AGPL; not compatible with Clawdis MIT if bundled. +- signal-cli is unofficial; must stay up to date (Signal server churn). + +## Model +- Run `signal-cli` as separate process (user-installed). +- Prefer `daemon --http=127.0.0.1:PORT` for JSON-RPC + SSE. +- Alternative: `jsonRpc` mode over stdin/stdout. + +## Endpoints (daemon --http) +- `POST /api/v1/rpc` JSON-RPC request (single or batch). +- `GET /api/v1/events` SSE stream of `receive` notifications. +- `GET /api/v1/check` health probe (200 = up). + +## Multi-account +- Start daemon without `-a`. +- Include `params.account` (E164) on JSON-RPC calls. +- SSE `?account=+E164` filters events; no param = all accounts. + +## Minimal RPC surface +- `send` (recipient/groupId/username, message, attachments). +- `listGroups` (map group IDs). +- `subscribeReceive` / `unsubscribeReceive` (if manual receive). +- `startLink` / `finishLink` (optional device link flow). + +## Process plan (Clawdis adapter) +1) Detect `signal-cli` binary; refuse if missing. +2) Launch daemon (HTTP preferred), store PID. +3) Poll `/api/v1/check` until ready. +4) Open SSE stream; parse `event: receive`. +5) Translate receive payload into Clawdis surface model. +6) On SSE disconnect, backoff + reconnect. + +## Storage +- signal-cli data lives in `$XDG_DATA_HOME/signal-cli/data` or + `$HOME/.local/share/signal-cli/data`. + +## References (local) +- `~/Projects/oss/signal-cli/README.md` +- `~/Projects/oss/signal-cli/man/signal-cli-jsonrpc.5.adoc` +- `~/Projects/oss/signal-cli/src/main/java/org/asamk/signal/http/HttpServerHandler.java` +- `~/Projects/oss/signal-cli/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java` diff --git a/src/cli/cron-cli.ts b/src/cli/cron-cli.ts index 980c672d8..903f43b00 100644 --- a/src/cli/cron-cli.ts +++ b/src/cli/cron-cli.ts @@ -155,7 +155,7 @@ export function registerCronCli(program: Command) { .option("--deliver", "Deliver agent output", false) .option( "--channel ", - "Delivery channel (last|whatsapp|telegram|discord)", + "Delivery channel (last|whatsapp|telegram|discord|signal)", "last", ) .option( @@ -414,7 +414,7 @@ export function registerCronCli(program: Command) { .option("--deliver", "Deliver agent output", false) .option( "--channel ", - "Delivery channel (last|whatsapp|telegram|discord)", + "Delivery channel (last|whatsapp|telegram|discord|signal)", ) .option( "--to ", diff --git a/src/cli/deps.ts b/src/cli/deps.ts index 76f309c87..b2bd241f5 100644 --- a/src/cli/deps.ts +++ b/src/cli/deps.ts @@ -1,11 +1,13 @@ import { sendMessageDiscord } from "../discord/send.js"; import { logWebSelfId, sendMessageWhatsApp } from "../providers/web/index.js"; +import { sendMessageSignal } from "../signal/send.js"; import { sendMessageTelegram } from "../telegram/send.js"; export type CliDeps = { sendMessageWhatsApp: typeof sendMessageWhatsApp; sendMessageTelegram: typeof sendMessageTelegram; sendMessageDiscord: typeof sendMessageDiscord; + sendMessageSignal: typeof sendMessageSignal; }; export function createDefaultDeps(): CliDeps { @@ -13,6 +15,7 @@ export function createDefaultDeps(): CliDeps { sendMessageWhatsApp, sendMessageTelegram, sendMessageDiscord, + sendMessageSignal, }; } diff --git a/src/cli/program.ts b/src/cli/program.ts index 4c39ec22b..8109104d0 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -149,10 +149,10 @@ export function buildProgram() { program .command("send") - .description("Send a message (WhatsApp Web, Telegram bot, or Discord)") + .description("Send a message (WhatsApp Web, Telegram bot, Discord, Signal)") .requiredOption( "-t, --to ", - "Recipient: E.164 for WhatsApp, Telegram chat id/@username, or Discord channel/user", + "Recipient: E.164 for WhatsApp/Signal, Telegram chat id/@username, or Discord channel/user", ) .requiredOption("-m, --message ", "Message body") .option( @@ -161,7 +161,7 @@ export function buildProgram() { ) .option( "--provider ", - "Delivery provider: whatsapp|telegram|discord (default: whatsapp)", + "Delivery provider: whatsapp|telegram|discord|signal (default: whatsapp)", ) .option("--dry-run", "Print payload and skip sending", false) .option("--json", "Output result as JSON", false) @@ -189,7 +189,7 @@ Examples: program .command("agent") .description( - "Talk directly to the configured agent (no chat send; optional WhatsApp delivery)", + "Talk directly to the configured agent (no chat send; optional delivery)", ) .requiredOption("-m, --message ", "Message body for the agent") .option( @@ -204,7 +204,7 @@ Examples: .option("--verbose ", "Persist agent verbose level for the session") .option( "--provider ", - "Delivery provider: whatsapp|telegram|discord (default: whatsapp)", + "Delivery provider: whatsapp|telegram|discord|signal (default: whatsapp)", ) .option( "--deliver", @@ -253,7 +253,7 @@ Examples: .option("--json", "Output JSON instead of text", false) .option( "--deep", - "Probe providers (WhatsApp Web + Telegram + Discord)", + "Probe providers (WhatsApp Web + Telegram + Discord + Signal)", false, ) .option("--timeout ", "Probe timeout in milliseconds", "10000") @@ -264,7 +264,7 @@ Examples: Examples: clawdis status # show linked account + session store summary clawdis status --json # machine-readable output - clawdis status --deep # run provider probes (WA + Telegram + Discord) + clawdis status --deep # run provider probes (WA + Telegram + Discord + Signal) clawdis status --deep --timeout 5000 # tighten probe timeout`, ) .action(async (opts) => { diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 1be671cda..a256b7ef9 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -415,6 +415,7 @@ export async function agentCommand( const whatsappTarget = opts.to ? normalizeE164(opts.to) : allowFrom[0]; const telegramTarget = opts.to?.trim() || undefined; const discordTarget = opts.to?.trim() || undefined; + const signalTarget = opts.to?.trim() || undefined; const logDeliveryError = (err: unknown) => { const deliveryTarget = @@ -424,6 +425,8 @@ export async function agentCommand( ? whatsappTarget : deliveryProvider === "discord" ? discordTarget + : deliveryProvider === "signal" + ? signalTarget : undefined; const message = `Delivery failed (${deliveryProvider}${deliveryTarget ? ` to ${deliveryTarget}` : ""}): ${String(err)}`; runtime.error?.(message); @@ -450,6 +453,13 @@ export async function agentCommand( if (!bestEffortDeliver) throw err; logDeliveryError(err); } + if (deliveryProvider === "signal" && !signalTarget) { + const err = new Error( + "Delivering to Signal requires --to ", + ); + if (!bestEffortDeliver) throw err; + logDeliveryError(err); + } if (deliveryProvider === "webchat") { const err = new Error( "Delivering to WebChat is not supported via `clawdis agent`; use WhatsApp/Telegram or run with --deliver=false.", @@ -461,6 +471,7 @@ export async function agentCommand( deliveryProvider !== "whatsapp" && deliveryProvider !== "telegram" && deliveryProvider !== "discord" && + deliveryProvider !== "signal" && deliveryProvider !== "webchat" ) { const err = new Error(`Unknown provider: ${deliveryProvider}`); @@ -574,5 +585,36 @@ export async function agentCommand( logDeliveryError(err); } } + + if (deliveryProvider === "signal" && signalTarget) { + try { + if (media.length === 0) { + await deps.sendMessageSignal(signalTarget, text, { + maxBytes: cfg.signal?.mediaMaxMb + ? cfg.signal.mediaMaxMb * 1024 * 1024 + : cfg.agent?.mediaMaxMb + ? cfg.agent.mediaMaxMb * 1024 * 1024 + : undefined, + }); + } else { + let first = true; + for (const url of media) { + const caption = first ? text : ""; + first = false; + await deps.sendMessageSignal(signalTarget, caption, { + mediaUrl: url, + maxBytes: cfg.signal?.mediaMaxMb + ? cfg.signal.mediaMaxMb * 1024 * 1024 + : cfg.agent?.mediaMaxMb + ? cfg.agent.mediaMaxMb * 1024 * 1024 + : undefined, + }); + } + } + } catch (err) { + if (!bestEffortDeliver) throw err; + logDeliveryError(err); + } + } } } diff --git a/src/commands/send.test.ts b/src/commands/send.test.ts index 413b0c1bc..ee12be9c2 100644 --- a/src/commands/send.test.ts +++ b/src/commands/send.test.ts @@ -35,6 +35,7 @@ const makeDeps = (overrides: Partial = {}): CliDeps => ({ sendMessageWhatsApp: vi.fn(), sendMessageTelegram: vi.fn(), sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), ...overrides, }); @@ -106,6 +107,23 @@ describe("sendCommand", () => { expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled(); }); + it("routes to signal provider", async () => { + const deps = makeDeps({ + sendMessageSignal: vi.fn().mockResolvedValue({ messageId: "s1" }), + }); + await sendCommand( + { to: "+15551234567", message: "hi", provider: "signal" }, + deps, + runtime, + ); + expect(deps.sendMessageSignal).toHaveBeenCalledWith( + "+15551234567", + "hi", + expect.objectContaining({ mediaUrl: undefined }), + ); + expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled(); + }); + it("emits json output", async () => { callGatewayMock.mockResolvedValueOnce({ messageId: "direct2" }); const deps = makeDeps(); diff --git a/src/commands/send.ts b/src/commands/send.ts index 35e259746..3f46edd44 100644 --- a/src/commands/send.ts +++ b/src/commands/send.ts @@ -82,6 +82,31 @@ export async function sendCommand( return; } + if (provider === "signal") { + const result = await deps.sendMessageSignal(opts.to, opts.message, { + mediaUrl: opts.media, + }); + runtime.log( + success(`✅ Sent via signal. Message ID: ${result.messageId}`), + ); + if (opts.json) { + runtime.log( + JSON.stringify( + { + provider: "signal", + via: "direct", + to: opts.to, + messageId: result.messageId, + mediaUrl: opts.media ?? null, + }, + null, + 2, + ), + ); + } + return; + } + // Always send via gateway over WS to avoid multi-session corruption. const sendViaGateway = async () => callGateway<{ @@ -93,6 +118,7 @@ export async function sendCommand( to: opts.to, message: opts.message, mediaUrl: opts.media, + provider, idempotencyKey: randomIdempotencyKey(), }, timeoutMs: 10_000, diff --git a/src/config/config.ts b/src/config/config.ts index fe898c89d..e2806234a 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -165,12 +165,36 @@ export type DiscordConfig = { mediaMaxMb?: number; }; +export type SignalConfig = { + /** If false, do not start the Signal provider. Default: true. */ + enabled?: boolean; + /** Optional explicit E.164 account for signal-cli. */ + account?: string; + /** Optional full base URL for signal-cli HTTP daemon. */ + httpUrl?: string; + /** HTTP host for signal-cli daemon (default 127.0.0.1). */ + httpHost?: string; + /** HTTP port for signal-cli daemon (default 8080). */ + httpPort?: number; + /** signal-cli binary path (default: signal-cli). */ + cliPath?: string; + /** Auto-start signal-cli daemon (default: true if httpUrl not set). */ + autoStart?: boolean; + receiveMode?: "on-start" | "manual"; + ignoreAttachments?: boolean; + ignoreStories?: boolean; + sendReadReceipts?: boolean; + allowFrom?: Array; + mediaMaxMb?: number; +}; + export type QueueMode = "queue" | "interrupt"; export type QueueModeBySurface = { whatsapp?: QueueMode; telegram?: QueueMode; discord?: QueueMode; + signal?: QueueMode; webchat?: QueueMode; }; @@ -399,8 +423,8 @@ export type ClawdisConfig = { every?: string; /** Heartbeat model override (provider/model). */ model?: string; - /** Delivery target (last|whatsapp|telegram|discord|none). */ - target?: "last" | "whatsapp" | "telegram" | "discord" | "none"; + /** Delivery target (last|whatsapp|telegram|discord|signal|none). */ + target?: "last" | "whatsapp" | "telegram" | "discord" | "signal" | "none"; /** Optional delivery override (E.164 for WhatsApp, chat id for Telegram). */ to?: string; /** Override the heartbeat prompt body (default: "HEARTBEAT"). */ @@ -424,6 +448,7 @@ export type ClawdisConfig = { web?: WebConfig; telegram?: TelegramConfig; discord?: DiscordConfig; + signal?: SignalConfig; cron?: CronConfig; hooks?: HooksConfig; bridge?: BridgeConfig; @@ -518,6 +543,7 @@ const QueueModeBySurfaceSchema = z whatsapp: QueueModeSchema.optional(), telegram: QueueModeSchema.optional(), discord: QueueModeSchema.optional(), + signal: QueueModeSchema.optional(), webchat: QueueModeSchema.optional(), }) .optional(); @@ -563,6 +589,7 @@ const HeartbeatSchema = z z.literal("whatsapp"), z.literal("telegram"), z.literal("discord"), + z.literal("signal"), z.literal("none"), ]) .optional(), @@ -621,6 +648,7 @@ const HookMappingSchema = z z.literal("whatsapp"), z.literal("telegram"), z.literal("discord"), + z.literal("signal"), ]) .optional(), to: z.string().optional(), @@ -814,6 +842,23 @@ const ClawdisSchema = z.object({ mediaMaxMb: z.number().positive().optional(), }) .optional(), + signal: z + .object({ + enabled: z.boolean().optional(), + account: z.string().optional(), + httpUrl: z.string().optional(), + httpHost: z.string().optional(), + httpPort: z.number().int().positive().optional(), + cliPath: z.string().optional(), + autoStart: z.boolean().optional(), + receiveMode: z.union([z.literal("on-start"), z.literal("manual")]).optional(), + ignoreAttachments: z.boolean().optional(), + ignoreStories: z.boolean().optional(), + sendReadReceipts: z.boolean().optional(), + allowFrom: z.array(z.union([z.string(), z.number()])).optional(), + mediaMaxMb: z.number().positive().optional(), + }) + .optional(), bridge: z .object({ enabled: z.boolean().optional(), diff --git a/src/config/sessions.ts b/src/config/sessions.ts index fc586c320..1f653d74b 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -27,7 +27,7 @@ export type SessionEntry = { totalTokens?: number; model?: string; contextTokens?: number; - lastChannel?: "whatsapp" | "telegram" | "discord" | "webchat"; + lastChannel?: "whatsapp" | "telegram" | "discord" | "signal" | "webchat"; lastTo?: string; skillsSnapshot?: SessionSkillSnapshot; }; diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index bffd10021..6557d126b 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -1871,6 +1871,61 @@ describe("gateway server", () => { await server.close(); }); + test("agent routes main last-channel signal", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); + testSessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testSessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-signal", + updatedAt: Date.now(), + lastChannel: "signal", + lastTo: "+15551234567", + }, + }, + null, + 2, + ), + "utf-8", + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + ws.send( + JSON.stringify({ + type: "req", + id: "agent-last-signal", + method: "agent", + params: { + message: "hi", + sessionKey: "main", + channel: "last", + deliver: true, + idempotencyKey: "idem-agent-last-signal", + }, + }), + ); + await onceMessage( + ws, + (o) => o.type === "res" && o.id === "agent-last-signal", + ); + + const spy = vi.mocked(agentCommand); + expect(spy).toHaveBeenCalled(); + const call = spy.mock.calls.at(-1)?.[0] as Record; + expect(call.provider).toBe("signal"); + expect(call.to).toBe("+15551234567"); + expect(call.deliver).toBe(true); + expect(call.bestEffortDeliver).toBe(true); + expect(call.sessionId).toBe("sess-signal"); + + ws.close(); + await server.close(); + }); + test("agent ignores webchat last-channel for routing", async () => { testAllowFrom = ["+1555"]; const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); @@ -2134,6 +2189,11 @@ describe("gateway server", () => { probe?: unknown; lastProbeAt?: unknown; }; + signal?: { + configured?: boolean; + probe?: unknown; + lastProbeAt?: unknown; + }; }>(ws, "providers.status", { probe: false, timeoutMs: 2000 }); expect(res.ok).toBe(true); expect(res.payload?.whatsapp).toBeTruthy(); @@ -2141,6 +2201,9 @@ describe("gateway server", () => { expect(res.payload?.telegram?.tokenSource).toBe("none"); expect(res.payload?.telegram?.probe).toBeUndefined(); expect(res.payload?.telegram?.lastProbeAt).toBeNull(); + expect(res.payload?.signal?.configured).toBe(false); + expect(res.payload?.signal?.probe).toBeUndefined(); + expect(res.payload?.signal?.lastProbeAt).toBeNull(); ws.close(); await server.close(); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index c0cc55de2..4375ca065 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -74,6 +74,8 @@ import { sendMessageDiscord, } from "../discord/index.js"; import { type DiscordProbe, probeDiscord } from "../discord/probe.js"; +import { monitorSignalProvider, sendMessageSignal } from "../signal/index.js"; +import { type SignalProbe, probeSignal } from "../signal/probe.js"; import { isVerbose } from "../globals.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { startGatewayBonjourAdvertiser } from "../infra/bonjour.js"; @@ -283,10 +285,12 @@ const logWsControl = log.child("ws"); const logWhatsApp = logProviders.child("whatsapp"); const logTelegram = logProviders.child("telegram"); const logDiscord = logProviders.child("discord"); +const logSignal = logProviders.child("signal"); const canvasRuntime = runtimeForLogger(logCanvas); const whatsappRuntimeEnv = runtimeForLogger(logWhatsApp); const telegramRuntimeEnv = runtimeForLogger(logTelegram); const discordRuntimeEnv = runtimeForLogger(logDiscord); +const signalRuntimeEnv = runtimeForLogger(logSignal); function loadTelegramToken( config: ClawdisConfig, @@ -1367,7 +1371,7 @@ export async function startGatewayServer( wakeMode: "now" | "next-heartbeat"; sessionKey: string; deliver: boolean; - channel: "last" | "whatsapp" | "telegram" | "discord"; + channel: "last" | "whatsapp" | "telegram" | "discord" | "signal"; to?: string; thinking?: string; timeoutSeconds?: number; @@ -1392,6 +1396,7 @@ export async function startGatewayServer( channelRaw === "whatsapp" || channelRaw === "telegram" || channelRaw === "discord" || + channelRaw === "signal" || channelRaw === "last" ? channelRaw : channelRaw === undefined @@ -1400,7 +1405,7 @@ export async function startGatewayServer( if (channel === null) { return { ok: false, - error: "channel must be last|whatsapp|telegram|discord", + error: "channel must be last|whatsapp|telegram|discord|signal", }; } const toRaw = payload.to; @@ -1451,7 +1456,7 @@ export async function startGatewayServer( wakeMode: "now" | "next-heartbeat"; sessionKey: string; deliver: boolean; - channel: "last" | "whatsapp" | "telegram" | "discord"; + channel: "last" | "whatsapp" | "telegram" | "discord" | "signal"; to?: string; thinking?: string; timeoutSeconds?: number; @@ -1734,9 +1739,11 @@ export async function startGatewayServer( let whatsappAbort: AbortController | null = null; let telegramAbort: AbortController | null = null; let discordAbort: AbortController | null = null; + let signalAbort: AbortController | null = null; let whatsappTask: Promise | null = null; let telegramTask: Promise | null = null; let discordTask: Promise | null = null; + let signalTask: Promise | null = null; let whatsappRuntime: WebProviderStatus = { running: false, connected: false, @@ -1771,6 +1778,19 @@ export async function startGatewayServer( lastStopAt: null, lastError: null, }; + let signalRuntime: { + running: boolean; + lastStartAt?: number | null; + lastStopAt?: number | null; + lastError?: string | null; + baseUrl?: string | null; + } = { + running: false, + lastStartAt: null, + lastStopAt: null, + lastError: null, + baseUrl: null, + }; const clients = new Set(); let seq = 0; // Track per-run sequence to detect out-of-order/lost agent events. @@ -2102,10 +2122,96 @@ export async function startGatewayServer( }; }; + const startSignalProvider = async () => { + if (signalTask) return; + const cfg = loadConfig(); + if (!cfg.signal) { + signalRuntime = { + ...signalRuntime, + running: false, + lastError: "not configured", + }; + logSignal.info("skipping provider start (signal not configured)"); + return; + } + if (cfg.signal?.enabled === false) { + signalRuntime = { + ...signalRuntime, + running: false, + lastError: "disabled", + }; + logSignal.info("skipping provider start (signal.enabled=false)"); + return; + } + const host = cfg.signal?.httpHost?.trim() || "127.0.0.1"; + const port = cfg.signal?.httpPort ?? 8080; + const baseUrl = cfg.signal?.httpUrl?.trim() || `http://${host}:${port}`; + logSignal.info(`starting provider (${baseUrl})`); + signalAbort = new AbortController(); + signalRuntime = { + ...signalRuntime, + running: true, + lastStartAt: Date.now(), + lastError: null, + baseUrl, + }; + const task = monitorSignalProvider({ + baseUrl, + account: cfg.signal?.account, + cliPath: cfg.signal?.cliPath, + httpHost: cfg.signal?.httpHost, + httpPort: cfg.signal?.httpPort, + autoStart: cfg.signal?.autoStart, + receiveMode: cfg.signal?.receiveMode, + ignoreAttachments: cfg.signal?.ignoreAttachments, + ignoreStories: cfg.signal?.ignoreStories, + sendReadReceipts: cfg.signal?.sendReadReceipts, + allowFrom: cfg.signal?.allowFrom, + mediaMaxMb: cfg.signal?.mediaMaxMb, + runtime: signalRuntimeEnv, + abortSignal: signalAbort.signal, + }) + .catch((err) => { + signalRuntime = { + ...signalRuntime, + lastError: formatError(err), + }; + logSignal.error(`provider exited: ${formatError(err)}`); + }) + .finally(() => { + signalAbort = null; + signalTask = null; + signalRuntime = { + ...signalRuntime, + running: false, + lastStopAt: Date.now(), + }; + }); + signalTask = task; + }; + + const stopSignalProvider = async () => { + if (!signalAbort && !signalTask) return; + signalAbort?.abort(); + try { + await signalTask; + } catch { + // ignore + } + signalAbort = null; + signalTask = null; + signalRuntime = { + ...signalRuntime, + running: false, + lastStopAt: Date.now(), + }; + }; + const startProviders = async () => { await startWhatsAppProvider(); await startDiscordProvider(); await startTelegramProvider(); + await startSignalProvider(); }; const broadcast = ( @@ -3156,7 +3262,9 @@ export async function startGatewayServer( typeof link?.channel === "string" ? link.channel.trim() : ""; const channel = channelRaw.toLowerCase(); const provider = - channel === "whatsapp" || channel === "telegram" + channel === "whatsapp" || + channel === "telegram" || + channel === "signal" ? channel : undefined; const to = @@ -3984,6 +4092,20 @@ export async function startGatewayServer( discordLastProbeAt = Date.now(); } + const signalCfg = cfg.signal; + const signalEnabled = signalCfg?.enabled !== false; + const signalHost = signalCfg?.httpHost?.trim() || "127.0.0.1"; + const signalPort = signalCfg?.httpPort ?? 8080; + const signalBaseUrl = + signalCfg?.httpUrl?.trim() || `http://${signalHost}:${signalPort}`; + const signalConfigured = Boolean(signalCfg) && signalEnabled; + let signalProbe: SignalProbe | undefined; + let signalLastProbeAt: number | null = null; + if (probe && signalConfigured) { + signalProbe = await probeSignal(signalBaseUrl, timeoutMs); + signalLastProbeAt = Date.now(); + } + const linked = await webAuthExists(); const authAgeMs = getWebAuthAgeMs(); const self = readWebSelfId(); @@ -4027,6 +4149,16 @@ export async function startGatewayServer( probe: discordProbe, lastProbeAt: discordLastProbeAt, }, + signal: { + configured: signalConfigured, + baseUrl: signalBaseUrl, + running: signalRuntime.running, + lastStartAt: signalRuntime.lastStartAt ?? null, + lastStopAt: signalRuntime.lastStopAt ?? null, + lastError: signalRuntime.lastError ?? null, + probe: signalProbe, + lastProbeAt: signalLastProbeAt, + }, }, undefined, ); @@ -5925,6 +6057,28 @@ export async function startGatewayServer( payload, }); respond(true, payload, undefined, { provider }); + } else if (provider === "signal") { + const cfg = loadConfig(); + const host = cfg.signal?.httpHost?.trim() || "127.0.0.1"; + const port = cfg.signal?.httpPort ?? 8080; + const baseUrl = + cfg.signal?.httpUrl?.trim() || `http://${host}:${port}`; + const result = await sendMessageSignal(to, message, { + mediaUrl: params.mediaUrl, + baseUrl, + account: cfg.signal?.account, + }); + const payload = { + runId: idem, + messageId: result.messageId, + provider, + }; + dedupe.set(`send:${idem}`, { + ts: Date.now(), + ok: true, + payload, + }); + respond(true, payload, undefined, { provider }); } else { const result = await sendMessageWhatsApp(to, message, { mediaUrl: params.mediaUrl, @@ -6061,6 +6215,7 @@ export async function startGatewayServer( requestedChannel === "whatsapp" || requestedChannel === "telegram" || requestedChannel === "discord" || + requestedChannel === "signal" || requestedChannel === "webchat" ) { return requestedChannel; @@ -6079,7 +6234,8 @@ export async function startGatewayServer( if ( resolvedChannel === "whatsapp" || resolvedChannel === "telegram" || - resolvedChannel === "discord" + resolvedChannel === "discord" || + resolvedChannel === "signal" ) { return lastTo || undefined; } @@ -6324,6 +6480,7 @@ export async function startGatewayServer( await stopWhatsAppProvider(); await stopTelegramProvider(); await stopDiscordProvider(); + await stopSignalProvider(); cron.stop(); heartbeatRunner.stop(); broadcast("shutdown", { @@ -6361,7 +6518,9 @@ export async function startGatewayServer( await stopBrowserControlServerIfStarted().catch(() => {}); } await Promise.allSettled( - [whatsappTask, telegramTask].filter(Boolean) as Array>, + [whatsappTask, telegramTask, signalTask].filter( + Boolean, + ) as Array>, ); await new Promise((resolve) => wss.close(() => resolve())); await new Promise((resolve, reject) => diff --git a/src/infra/provider-summary.ts b/src/infra/provider-summary.ts index 6f84fbe1f..440d11f43 100644 --- a/src/infra/provider-summary.ts +++ b/src/infra/provider-summary.ts @@ -48,6 +48,22 @@ export async function buildProviderSummary( ); } + const signalEnabled = effective.signal?.enabled !== false; + if (!signalEnabled) { + lines.push(chalk.cyan("Signal: disabled")); + } else { + const signalConfigured = Boolean( + effective.signal?.httpUrl || + effective.signal?.cliPath || + effective.signal?.account, + ); + lines.push( + signalConfigured + ? chalk.green("Signal: configured") + : chalk.cyan("Signal: not configured"), + ); + } + const allowFrom = effective.routing?.allowFrom?.length ? effective.routing.allowFrom.map(normalizeE164).filter(Boolean) : []; diff --git a/src/signal/client.ts b/src/signal/client.ts new file mode 100644 index 000000000..fde97e97c --- /dev/null +++ b/src/signal/client.ts @@ -0,0 +1,187 @@ +import { randomUUID } from "node:crypto"; + +export type SignalRpcOptions = { + baseUrl: string; + timeoutMs?: number; +}; + +export type SignalRpcError = { + code?: number; + message?: string; + data?: unknown; +}; + +export type SignalRpcResponse = { + jsonrpc?: string; + result?: T; + error?: SignalRpcError; + id?: string | number | null; +}; + +export type SignalSseEvent = { + event?: string; + data?: string; + id?: string; +}; + +const DEFAULT_TIMEOUT_MS = 10_000; + +function normalizeBaseUrl(url: string): string { + const trimmed = url.trim(); + if (!trimmed) { + throw new Error("Signal base URL is required"); + } + if (/^https?:\/\//i.test(trimmed)) return trimmed.replace(/\/+$/, ""); + return `http://${trimmed}`.replace(/\/+$/, ""); +} + +async function fetchWithTimeout( + url: string, + init: RequestInit, + timeoutMs: number, +) { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + try { + return await fetch(url, { ...init, signal: controller.signal }); + } finally { + clearTimeout(timer); + } +} + +export async function signalRpcRequest( + method: string, + params: Record | undefined, + opts: SignalRpcOptions, +): Promise { + const baseUrl = normalizeBaseUrl(opts.baseUrl); + const id = randomUUID(); + const body = JSON.stringify({ + jsonrpc: "2.0", + method, + params, + id, + }); + const res = await fetchWithTimeout( + `${baseUrl}/api/v1/rpc`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body, + }, + opts.timeoutMs ?? DEFAULT_TIMEOUT_MS, + ); + if (res.status === 201) { + return undefined as T; + } + const text = await res.text(); + if (!text) { + throw new Error(`Signal RPC empty response (status ${res.status})`); + } + const parsed = JSON.parse(text) as SignalRpcResponse; + if (parsed.error) { + const code = parsed.error.code ?? "unknown"; + const msg = parsed.error.message ?? "Signal RPC error"; + throw new Error(`Signal RPC ${code}: ${msg}`); + } + return parsed.result as T; +} + +export async function signalCheck( + baseUrl: string, + timeoutMs = DEFAULT_TIMEOUT_MS, +): Promise<{ ok: boolean; status?: number | null; error?: string | null }> { + const normalized = normalizeBaseUrl(baseUrl); + try { + const res = await fetchWithTimeout( + `${normalized}/api/v1/check`, + { method: "GET" }, + timeoutMs, + ); + if (!res.ok) { + return { ok: false, status: res.status, error: `HTTP ${res.status}` }; + } + return { ok: true, status: res.status, error: null }; + } catch (err) { + return { + ok: false, + status: null, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +export async function streamSignalEvents(params: { + baseUrl: string; + account?: string; + abortSignal?: AbortSignal; + onEvent: (event: SignalSseEvent) => void; +}): Promise { + const baseUrl = normalizeBaseUrl(params.baseUrl); + const url = new URL(`${baseUrl}/api/v1/events`); + if (params.account) url.searchParams.set("account", params.account); + + const res = await fetch(url, { + method: "GET", + headers: { Accept: "text/event-stream" }, + signal: params.abortSignal, + }); + if (!res.ok || !res.body) { + throw new Error( + `Signal SSE failed (${res.status} ${res.statusText || "error"})`, + ); + } + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let currentEvent: SignalSseEvent = {}; + + const flushEvent = () => { + if (!currentEvent.data && !currentEvent.event && !currentEvent.id) return; + params.onEvent({ + event: currentEvent.event, + data: currentEvent.data, + id: currentEvent.id, + }); + currentEvent = {}; + }; + + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + let lineEnd = buffer.indexOf("\n"); + while (lineEnd !== -1) { + let line = buffer.slice(0, lineEnd); + buffer = buffer.slice(lineEnd + 1); + if (line.endsWith("\r")) line = line.slice(0, -1); + + if (line === "") { + flushEvent(); + lineEnd = buffer.indexOf("\n"); + continue; + } + if (line.startsWith(":")) { + lineEnd = buffer.indexOf("\n"); + continue; + } + const [rawField, ...rest] = line.split(":"); + const field = rawField.trim(); + const rawValue = rest.join(":"); + const value = rawValue.startsWith(" ") ? rawValue.slice(1) : rawValue; + if (field === "event") { + currentEvent.event = value; + } else if (field === "data") { + currentEvent.data = currentEvent.data + ? `${currentEvent.data}\n${value}` + : value; + } else if (field === "id") { + currentEvent.id = value; + } + lineEnd = buffer.indexOf("\n"); + } + } + + flushEvent(); +} diff --git a/src/signal/daemon.ts b/src/signal/daemon.ts new file mode 100644 index 000000000..9bbc61189 --- /dev/null +++ b/src/signal/daemon.ts @@ -0,0 +1,68 @@ +import { spawn } from "node:child_process"; +import type { RuntimeEnv } from "../runtime.js"; + +export type SignalDaemonOpts = { + cliPath: string; + account?: string; + httpHost: string; + httpPort: number; + receiveMode?: "on-start" | "manual"; + ignoreAttachments?: boolean; + ignoreStories?: boolean; + sendReadReceipts?: boolean; + runtime?: RuntimeEnv; +}; + +export type SignalDaemonHandle = { + pid?: number; + stop: () => void; +}; + +function buildDaemonArgs(opts: SignalDaemonOpts): string[] { + const args: string[] = []; + if (opts.account) { + args.push("-a", opts.account); + } + args.push("daemon"); + args.push("--http", `${opts.httpHost}:${opts.httpPort}`); + args.push("--no-receive-stdout"); + + if (opts.receiveMode) { + args.push("--receive-mode", opts.receiveMode); + } + if (opts.ignoreAttachments) args.push("--ignore-attachments"); + if (opts.ignoreStories) args.push("--ignore-stories"); + if (opts.sendReadReceipts) args.push("--send-read-receipts"); + + return args; +} + +export function spawnSignalDaemon(opts: SignalDaemonOpts): SignalDaemonHandle { + const args = buildDaemonArgs(opts); + const child = spawn(opts.cliPath, args, { + stdio: ["ignore", "pipe", "pipe"], + }); + const log = opts.runtime?.log ?? (() => {}); + const error = opts.runtime?.error ?? (() => {}); + + child.stdout?.on("data", (data) => { + const text = data.toString().trim(); + if (text) log(`signal-cli: ${text}`); + }); + child.stderr?.on("data", (data) => { + const text = data.toString().trim(); + if (text) error(`signal-cli: ${text}`); + }); + child.on("error", (err) => { + error(`signal-cli spawn error: ${String(err)}`); + }); + + return { + pid: child.pid ?? undefined, + stop: () => { + if (!child.killed) { + child.kill("SIGTERM"); + } + }, + }; +} diff --git a/src/signal/index.ts b/src/signal/index.ts new file mode 100644 index 000000000..60e88ab2b --- /dev/null +++ b/src/signal/index.ts @@ -0,0 +1,3 @@ +export { monitorSignalProvider } from "./monitor.js"; +export { probeSignal } from "./probe.js"; +export { sendMessageSignal } from "./send.js"; diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts new file mode 100644 index 000000000..80b3c3300 --- /dev/null +++ b/src/signal/monitor.ts @@ -0,0 +1,384 @@ +import { formatAgentEnvelope } from "../auto-reply/envelope.js"; +import { getReplyFromConfig } from "../auto-reply/reply.js"; +import type { ReplyPayload } from "../auto-reply/types.js"; +import { loadConfig } from "../config/config.js"; +import { resolveStorePath, updateLastRoute } from "../config/sessions.js"; +import { chunkText } from "../auto-reply/chunk.js"; +import { danger, isVerbose, logVerbose } from "../globals.js"; +import { mediaKindFromMime } from "../media/constants.js"; +import { saveMediaBuffer } from "../media/store.js"; +import type { RuntimeEnv } from "../runtime.js"; +import { normalizeE164 } from "../utils.js"; +import { signalRpcRequest, streamSignalEvents } from "./client.js"; +import { spawnSignalDaemon } from "./daemon.js"; +import { sendMessageSignal } from "./send.js"; + +type SignalEnvelope = { + sourceNumber?: string | null; + sourceName?: string | null; + timestamp?: number | null; + dataMessage?: SignalDataMessage | null; + editMessage?: { dataMessage?: SignalDataMessage | null } | null; + syncMessage?: unknown | null; +}; + +type SignalDataMessage = { + timestamp?: number; + message?: string | null; + attachments?: Array; + groupInfo?: { + groupId?: string | null; + groupName?: string | null; + } | null; + quote?: { text?: string | null } | null; +}; + +type SignalAttachment = { + id?: string | null; + contentType?: string | null; + filename?: string | null; + size?: number | null; +}; + +export type MonitorSignalOpts = { + runtime?: RuntimeEnv; + abortSignal?: AbortSignal; + account?: string; + baseUrl?: string; + autoStart?: boolean; + cliPath?: string; + httpHost?: string; + httpPort?: number; + receiveMode?: "on-start" | "manual"; + ignoreAttachments?: boolean; + ignoreStories?: boolean; + sendReadReceipts?: boolean; + allowFrom?: Array; + mediaMaxMb?: number; +}; + +type SignalReceivePayload = { + account?: string; + envelope?: SignalEnvelope | null; + exception?: { message?: string } | null; +}; + +function resolveRuntime(opts: MonitorSignalOpts): RuntimeEnv { + return ( + opts.runtime ?? { + log: console.log, + error: console.error, + exit: (code: number): never => { + throw new Error(`exit ${code}`); + }, + } + ); +} + +function resolveBaseUrl(opts: MonitorSignalOpts): string { + const cfg = loadConfig(); + const signalCfg = cfg.signal; + if (opts.baseUrl?.trim()) return opts.baseUrl.trim(); + if (signalCfg?.httpUrl?.trim()) return signalCfg.httpUrl.trim(); + const host = opts.httpHost ?? signalCfg?.httpHost ?? "127.0.0.1"; + const port = opts.httpPort ?? signalCfg?.httpPort ?? 8080; + return `http://${host}:${port}`; +} + +function resolveAccount(opts: MonitorSignalOpts): string | undefined { + const cfg = loadConfig(); + return opts.account?.trim() || cfg.signal?.account?.trim() || undefined; +} + +function resolveAllowFrom(opts: MonitorSignalOpts): string[] { + const cfg = loadConfig(); + const raw = + opts.allowFrom ?? + cfg.signal?.allowFrom ?? + cfg.routing?.allowFrom ?? + []; + return raw.map((entry) => String(entry).trim()).filter(Boolean); +} + +function isAllowedSender(sender: string, allowFrom: string[]): boolean { + if (allowFrom.length === 0) return true; + if (allowFrom.includes("*")) return true; + const normalizedAllow = allowFrom + .map((entry) => entry.replace(/^signal:/i, "")) + .map((entry) => normalizeE164(entry)); + const normalizedSender = normalizeE164(sender); + return normalizedAllow.includes(normalizedSender); +} + +async function fetchAttachment(params: { + baseUrl: string; + account?: string; + attachment: SignalAttachment; + sender?: string; + groupId?: string; + maxBytes: number; +}): Promise<{ path: string; contentType?: string } | null> { + const { attachment } = params; + if (!attachment?.id) return null; + if (attachment.size && attachment.size > params.maxBytes) { + throw new Error( + `Signal attachment ${attachment.id} exceeds ${(params.maxBytes / (1024 * 1024)).toFixed(0)}MB limit`, + ); + } + const rpcParams: Record = { + id: attachment.id, + }; + if (params.account) rpcParams.account = params.account; + if (params.groupId) rpcParams.groupId = params.groupId; + else if (params.sender) rpcParams.recipient = params.sender; + else return null; + + const result = await signalRpcRequest<{ data?: string }>( + "getAttachment", + rpcParams, + { baseUrl: params.baseUrl }, + ); + if (!result?.data) return null; + const buffer = Buffer.from(result.data, "base64"); + const saved = await saveMediaBuffer( + buffer, + attachment.contentType ?? undefined, + "inbound", + params.maxBytes, + ); + return { path: saved.path, contentType: saved.contentType }; +} + +async function deliverReplies(params: { + replies: ReplyPayload[]; + target: string; + baseUrl: string; + account?: string; + runtime: RuntimeEnv; + maxBytes: number; +}) { + const { replies, target, baseUrl, account, runtime, maxBytes } = params; + for (const payload of replies) { + const mediaList = + payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); + const text = payload.text ?? ""; + if (!text && mediaList.length === 0) continue; + if (mediaList.length === 0) { + for (const chunk of chunkText(text, 4000)) { + await sendMessageSignal(target, chunk, { + baseUrl, + account, + maxBytes, + }); + } + } else { + let first = true; + for (const url of mediaList) { + const caption = first ? text : ""; + first = false; + await sendMessageSignal(target, caption, { + baseUrl, + account, + mediaUrl: url, + maxBytes, + }); + } + } + runtime.log?.(`signal: delivered reply to ${target}`); + } +} + +export async function monitorSignalProvider( + opts: MonitorSignalOpts = {}, +): Promise { + const runtime = resolveRuntime(opts); + const cfg = loadConfig(); + const baseUrl = resolveBaseUrl(opts); + const account = resolveAccount(opts); + const allowFrom = resolveAllowFrom(opts); + const mediaMaxBytes = + (opts.mediaMaxMb ?? cfg.signal?.mediaMaxMb ?? 8) * 1024 * 1024; + const ignoreAttachments = + opts.ignoreAttachments ?? cfg.signal?.ignoreAttachments ?? false; + + const autoStart = + opts.autoStart ?? + cfg.signal?.autoStart ?? + (cfg.signal?.httpUrl ? false : true); + let daemonHandle: ReturnType | null = null; + + if (autoStart) { + const cliPath = opts.cliPath ?? cfg.signal?.cliPath ?? "signal-cli"; + const httpHost = opts.httpHost ?? cfg.signal?.httpHost ?? "127.0.0.1"; + const httpPort = opts.httpPort ?? cfg.signal?.httpPort ?? 8080; + daemonHandle = spawnSignalDaemon({ + cliPath, + account, + httpHost, + httpPort, + receiveMode: opts.receiveMode ?? cfg.signal?.receiveMode, + ignoreAttachments: + opts.ignoreAttachments ?? cfg.signal?.ignoreAttachments, + ignoreStories: opts.ignoreStories ?? cfg.signal?.ignoreStories, + sendReadReceipts: + opts.sendReadReceipts ?? cfg.signal?.sendReadReceipts, + runtime, + }); + } + + const onAbort = () => { + daemonHandle?.stop(); + }; + opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); + + try { + const handleEvent = async (event: { event?: string; data?: string }) => { + if (event.event !== "receive" || !event.data) return; + let payload: SignalReceivePayload | null = null; + try { + payload = JSON.parse(event.data) as SignalReceivePayload; + } catch (err) { + runtime.error?.(`signal: failed to parse event: ${String(err)}`); + return; + } + if (payload?.exception?.message) { + runtime.error?.(`signal: receive exception: ${payload.exception.message}`); + } + const envelope = payload?.envelope; + if (!envelope) return; + if (envelope.syncMessage) return; + const dataMessage = + envelope.dataMessage ?? envelope.editMessage?.dataMessage; + if (!dataMessage) return; + + const sender = envelope.sourceNumber?.trim(); + if (!sender) return; + if (account && normalizeE164(sender) === normalizeE164(account)) { + return; + } + if (!isAllowedSender(sender, allowFrom)) { + logVerbose(`Blocked signal sender ${sender} (not in allowFrom)`); + return; + } + + const groupId = dataMessage.groupInfo?.groupId ?? undefined; + const groupName = dataMessage.groupInfo?.groupName ?? undefined; + const isGroup = Boolean(groupId); + const messageText = (dataMessage.message ?? "").trim(); + + let mediaPath: string | undefined; + let mediaType: string | undefined; + let placeholder = ""; + const firstAttachment = dataMessage.attachments?.[0]; + if (firstAttachment?.id && !ignoreAttachments) { + try { + const fetched = await fetchAttachment({ + baseUrl, + account, + attachment: firstAttachment, + sender, + groupId, + maxBytes: mediaMaxBytes, + }); + if (fetched) { + mediaPath = fetched.path; + mediaType = fetched.contentType ?? firstAttachment.contentType ?? undefined; + } + } catch (err) { + runtime.error?.( + danger(`signal: attachment fetch failed: ${String(err)}`), + ); + } + } + + const kind = mediaKindFromMime(mediaType ?? undefined); + if (kind) { + placeholder = ``; + } else if (dataMessage.attachments?.length) { + placeholder = ""; + } + + const bodyText = + messageText || placeholder || dataMessage.quote?.text?.trim() || ""; + if (!bodyText) return; + + const fromLabel = isGroup + ? `${groupName ?? "Signal Group"} id:${groupId}` + : `${envelope.sourceName ?? sender} id:${sender}`; + const body = formatAgentEnvelope({ + surface: "Signal", + from: fromLabel, + timestamp: envelope.timestamp ?? undefined, + body: bodyText, + }); + + const ctxPayload = { + Body: body, + From: isGroup ? `group:${groupId}` : `signal:${sender}`, + To: isGroup ? `group:${groupId}` : `signal:${sender}`, + ChatType: isGroup ? "group" : "direct", + GroupSubject: isGroup ? groupName ?? undefined : undefined, + SenderName: envelope.sourceName ?? sender, + Surface: "signal" as const, + MessageSid: envelope.timestamp ? String(envelope.timestamp) : undefined, + Timestamp: envelope.timestamp ?? undefined, + MediaPath: mediaPath, + MediaType: mediaType, + MediaUrl: mediaPath, + }; + + if (!isGroup) { + const sessionCfg = cfg.session; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const storePath = resolveStorePath(sessionCfg?.store); + await updateLastRoute({ + storePath, + sessionKey: mainKey, + channel: "signal", + to: normalizeE164(sender), + }); + } + + if (isVerbose()) { + const preview = body.slice(0, 200).replace(/\n/g, "\\n"); + logVerbose( + `signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`, + ); + } + + const replyResult = await getReplyFromConfig(ctxPayload, undefined, cfg); + const replies = replyResult + ? Array.isArray(replyResult) + ? replyResult + : [replyResult] + : []; + if (replies.length === 0) return; + + await deliverReplies({ + replies, + target: ctxPayload.To, + baseUrl, + account, + runtime, + maxBytes: mediaMaxBytes, + }); + }; + + await streamSignalEvents({ + baseUrl, + account, + abortSignal: opts.abortSignal, + onEvent: (event) => { + void handleEvent(event).catch((err) => { + runtime.error?.(`signal: event handler failed: ${String(err)}`); + }); + }, + }); + } catch (err) { + if (opts.abortSignal?.aborted) return; + throw err; + } finally { + opts.abortSignal?.removeEventListener("abort", onAbort); + daemonHandle?.stop(); + } +} diff --git a/src/signal/probe.ts b/src/signal/probe.ts new file mode 100644 index 000000000..302c0be1c --- /dev/null +++ b/src/signal/probe.ts @@ -0,0 +1,47 @@ +import { signalCheck, signalRpcRequest } from "./client.js"; + +export type SignalProbe = { + ok: boolean; + status?: number | null; + error?: string | null; + elapsedMs: number; + version?: string | null; +}; + +export async function probeSignal( + baseUrl: string, + timeoutMs: number, +): Promise { + const started = Date.now(); + const result: SignalProbe = { + ok: false, + status: null, + error: null, + elapsedMs: 0, + version: null, + }; + const check = await signalCheck(baseUrl, timeoutMs); + if (!check.ok) { + return { + ...result, + status: check.status ?? null, + error: check.error ?? "unreachable", + elapsedMs: Date.now() - started, + }; + } + try { + const version = await signalRpcRequest("version", undefined, { + baseUrl, + timeoutMs, + }); + result.version = typeof version === "string" ? version : null; + } catch (err) { + result.error = err instanceof Error ? err.message : String(err); + } + return { + ...result, + ok: true, + status: check.status ?? null, + elapsedMs: Date.now() - started, + }; +} diff --git a/src/signal/send.ts b/src/signal/send.ts new file mode 100644 index 000000000..f4e9b8bff --- /dev/null +++ b/src/signal/send.ts @@ -0,0 +1,125 @@ +import { loadConfig } from "../config/config.js"; +import { mediaKindFromMime } from "../media/constants.js"; +import { saveMediaBuffer } from "../media/store.js"; +import { loadWebMedia } from "../web/media.js"; +import { signalRpcRequest } from "./client.js"; + +export type SignalSendOpts = { + baseUrl?: string; + account?: string; + mediaUrl?: string; + maxBytes?: number; + timeoutMs?: number; +}; + +export type SignalSendResult = { + messageId: string; + timestamp?: number; +}; + +type SignalTarget = + | { type: "recipient"; recipient: string } + | { type: "group"; groupId: string } + | { type: "username"; username: string }; + +function resolveBaseUrl(explicit?: string): string { + const cfg = loadConfig(); + const signalCfg = cfg.signal; + if (explicit?.trim()) return explicit.trim(); + if (signalCfg?.httpUrl?.trim()) return signalCfg.httpUrl.trim(); + const host = signalCfg?.httpHost?.trim() || "127.0.0.1"; + const port = signalCfg?.httpPort ?? 8080; + return `http://${host}:${port}`; +} + +function resolveAccount(explicit?: string): string | undefined { + const cfg = loadConfig(); + const signalCfg = cfg.signal; + const account = explicit?.trim() || signalCfg?.account?.trim(); + return account || undefined; +} + +function parseTarget(raw: string): SignalTarget { + let value = raw.trim(); + if (!value) throw new Error("Signal recipient is required"); + const lower = value.toLowerCase(); + if (lower.startsWith("group:")) { + return { type: "group", groupId: value.slice("group:".length).trim() }; + } + if (lower.startsWith("signal:")) { + value = value.slice("signal:".length).trim(); + } + if (lower.startsWith("username:")) { + return { type: "username", username: value.slice("username:".length).trim() }; + } + if (lower.startsWith("u:")) { + return { type: "username", username: value.trim() }; + } + return { type: "recipient", recipient: value }; +} + +async function resolveAttachment( + mediaUrl: string, + maxBytes: number, +): Promise<{ path: string; contentType?: string }> { + const media = await loadWebMedia(mediaUrl, maxBytes); + const saved = await saveMediaBuffer( + media.buffer, + media.contentType ?? undefined, + "outbound", + maxBytes, + ); + return { path: saved.path, contentType: saved.contentType }; +} + +export async function sendMessageSignal( + to: string, + text: string, + opts: SignalSendOpts = {}, +): Promise { + const baseUrl = resolveBaseUrl(opts.baseUrl); + const account = resolveAccount(opts.account); + const target = parseTarget(to); + let message = text ?? ""; + const maxBytes = opts.maxBytes ?? 8 * 1024 * 1024; + + let attachments: string[] | undefined; + if (opts.mediaUrl?.trim()) { + const resolved = await resolveAttachment(opts.mediaUrl.trim(), maxBytes); + attachments = [resolved.path]; + const kind = mediaKindFromMime(resolved.contentType ?? undefined); + if (!message && kind) { + // Avoid sending an empty body when only attachments exist. + message = kind === "image" ? "" : ``; + } + } + + if (!message.trim() && (!attachments || attachments.length === 0)) { + throw new Error("Signal send requires text or media"); + } + + const params: Record = { message }; + if (account) params.account = account; + if (attachments && attachments.length > 0) { + params.attachments = attachments; + } + + if (target.type === "recipient") { + params.recipient = [target.recipient]; + } else if (target.type === "group") { + params.groupId = target.groupId; + } else if (target.type === "username") { + params.username = [target.username]; + } + + const result = await signalRpcRequest<{ timestamp?: number }>( + "send", + params, + { baseUrl, timeoutMs: opts.timeoutMs }, + ); + const timestamp = result?.timestamp; + return { + messageId: timestamp ? String(timestamp) : "unknown", + timestamp, + }; +}