From 131864b9404587745ed050274a02863444ec1750 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 9 Dec 2025 20:18:50 +0000 Subject: [PATCH] gateway: drop ipc and simplify cli --- apps/macos/Sources/Clawdis/HealthStore.swift | 15 +- .../ClawdisIPCTests/HealthDecodeTests.swift | 2 +- docs/gateway.md | 9 +- docs/refactor/new-arch.md | 2 +- src/cli/program.ts | 132 +++++---- src/commands/health.ts | 10 - src/commands/send.test.ts | 38 +-- src/commands/send.ts | 76 +++-- src/web/auto-reply.ts | 55 +--- src/web/ipc.test.ts | 65 ---- src/web/ipc.ts | 277 ------------------ 11 files changed, 133 insertions(+), 548 deletions(-) delete mode 100644 src/web/ipc.test.ts delete mode 100644 src/web/ipc.ts diff --git a/apps/macos/Sources/Clawdis/HealthStore.swift b/apps/macos/Sources/Clawdis/HealthStore.swift index 5aa770ffa..ad08571cf 100644 --- a/apps/macos/Sources/Clawdis/HealthStore.swift +++ b/apps/macos/Sources/Clawdis/HealthStore.swift @@ -4,6 +4,11 @@ import OSLog import SwiftUI struct HealthSnapshot: Codable, Sendable { + struct Ipc: Codable, Sendable { + let exists: Bool? + let path: String? + } + struct Web: Codable, Sendable { struct Connect: Codable, Sendable { let ok: Bool @@ -29,17 +34,12 @@ struct HealthSnapshot: Codable, Sendable { let recent: [SessionInfo] } - struct IPC: Codable, Sendable { - let path: String - let exists: Bool - } - let ts: Double let durationMs: Double let web: Web + let ipc: Ipc? let heartbeatSeconds: Int? let sessions: Sessions - let ipc: IPC } enum HealthState: Equatable { @@ -177,9 +177,6 @@ final class HealthStore: ObservableObject { let reason = connect.error ?? "connect failed" return "\(reason) (\(code), \(elapsed))" } - if !snap.ipc.exists { - return "IPC socket missing at \(snap.ipc.path)" - } if let fallback, !fallback.isEmpty { return fallback } diff --git a/apps/macos/Tests/ClawdisIPCTests/HealthDecodeTests.swift b/apps/macos/Tests/ClawdisIPCTests/HealthDecodeTests.swift index 4972a796a..bd68add44 100644 --- a/apps/macos/Tests/ClawdisIPCTests/HealthDecodeTests.swift +++ b/apps/macos/Tests/ClawdisIPCTests/HealthDecodeTests.swift @@ -5,7 +5,7 @@ import Testing @Suite struct HealthDecodeTests { private let sampleJSON: String = // minimal but complete payload """ - {"ts":1733622000,"durationMs":420,"web":{"linked":true,"authAgeMs":120000,"connect":{"ok":true,"status":200,"error":null,"elapsedMs":800}},"heartbeatSeconds":60,"sessions":{"path":"/tmp/sessions.json","count":1,"recent":[{"key":"abc","updatedAt":1733621900,"age":120000}]},"ipc":{"path":"/tmp/ipc.sock","exists":true}} + {"ts":1733622000,"durationMs":420,"web":{"linked":true,"authAgeMs":120000,"connect":{"ok":true,"status":200,"error":null,"elapsedMs":800}},"heartbeatSeconds":60,"sessions":{"path":"/tmp/sessions.json","count":1,"recent":[{"key":"abc","updatedAt":1733621900,"age":120000}]}} """ @Test func decodesCleanJSON() async throws { diff --git a/docs/gateway.md b/docs/gateway.md index b269b102c..ddfc1b221 100644 --- a/docs/gateway.md +++ b/docs/gateway.md @@ -127,10 +127,11 @@ Enable with `systemctl enable --now clawdis-gateway.service`. - Graceful shutdown: emit `shutdown` event before closing; clients must handle close + reconnect. ## CLI helpers -- `clawdis gw:health` / `gw:status` — request health/status over the Gateway WS. -- `clawdis gw:send --to --message "hi" [--media-url ...]` — send via Gateway (idempotent). -- `clawdis gw:agent --message "hi" [--to ...]` — run an agent turn (waits for final by default). -- `clawdis gw:call --params '{"k":"v"}'` — raw method invoker for debugging. +- `clawdis gateway health|status` — request health/status over the Gateway WS. +- `clawdis gateway send --to --message "hi" [--media-url ...]` — send via Gateway (idempotent). +- `clawdis gateway agent --message "hi" [--to ...]` — run an agent turn (waits for final by default). +- `clawdis gateway call --params '{"k":"v"}'` — raw method invoker for debugging. +- All gateway helpers accept `--spawn-gateway` to start a local gateway if none is listening on `--url`. ## Migration guidance - Retire uses of `clawdis gateway` and the legacy TCP control port. diff --git a/docs/refactor/new-arch.md b/docs/refactor/new-arch.md index 0979e3281..0c35d4384 100644 --- a/docs/refactor/new-arch.md +++ b/docs/refactor/new-arch.md @@ -103,7 +103,7 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway, - Remove immediate `health/system-presence` fetch on connect. ✅ presence hydrated from snapshot; periodic refresh kept as fallback. - Handle `hello-error` and retry with backoff if version/token mismatched. ✅ macOS GatewayChannel reconnects with exponential backoff. - **CLI**: - - Add lightweight WS client helper for `status/health/send/agent` when Gateway is up. ✅ `gw:*` commands use the Gateway over WS. +- Add lightweight WS client helper for `status/health/send/agent` when Gateway is up. ✅ `gateway` subcommands use the Gateway over WS. - Consider a “local only” flag to avoid accidental remote connects. (optional; not needed with tunnel-first model.) - **WebChat backend**: - Single WS to Gateway; seed UI from snapshot; forward `presence/tick/agent` to browser. ✅ implemented via `GatewayClient` in `webchat/server.ts`. diff --git a/src/cli/program.ts b/src/cli/program.ts index 301de269b..866ab9813 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -122,7 +122,7 @@ export function buildProgram() { "clawdis gateway --force", "Kill anything bound to the default gateway port, then start it.", ], - ["clawdis gw:status", "Fetch Gateway status over WS."], + ["clawdis gateway ...", "Gateway control via WebSocket."], [ 'clawdis agent --to +15555550123 --message "Run summary" --deliver', "Talk directly to the agent using the Gateway; optionally send the WhatsApp reply.", @@ -188,6 +188,11 @@ export function buildProgram() { "--provider ", "Delivery provider: whatsapp|telegram (default: whatsapp)", ) + .option( + "--spawn-gateway", + "Start a local gateway on 127.0.0.1:18789 if none is running", + false, + ) .option("--dry-run", "Print payload and skip sending", false) .option("--json", "Output result as JSON", false) .option("--verbose", "Verbose logging", false) @@ -278,7 +283,7 @@ Examples: await runRpcLoop({ input: process.stdin, output: process.stdout }); await new Promise(() => {}); }); - program + const gateway = program .command("gateway") .description("Run the WebSocket Gateway") .option("--port ", "Port for the gateway WebSocket", "18789") @@ -343,12 +348,53 @@ Examples: .option("--url ", "Gateway WebSocket URL", "ws://127.0.0.1:18789") .option("--token ", "Gateway token (if required)") .option("--timeout ", "Timeout in ms", "10000") - .option("--expect-final", "Wait for final response (agent)", false); + .option("--expect-final", "Wait for final response (agent)", false) + .option( + "--spawn-gateway", + "Start a local gateway if none is listening on --url", + false, + ); + + const callWithSpawn = async ( + method: string, + opts: { + url?: string; + token?: string; + timeout?: string; + expectFinal?: boolean; + spawnGateway?: boolean; + }, + params?: unknown, + ) => { + const timeoutMs = Number(opts.timeout ?? 10_000); + const attempt = async () => + callGateway({ + url: opts.url, + token: opts.token, + method, + params, + expectFinal: Boolean(opts.expectFinal), + timeoutMs, + clientName: "cli", + mode: "cli", + }); + + try { + return await attempt(); + } catch (err) { + if (!opts.spawnGateway) throw err; + // Only spawn if there is clearly no listener. + const url = new URL(opts.url ?? "ws://127.0.0.1:18789"); + const port = Number(url.port || 18789); + await startGatewayServer(port); + return await attempt(); + } + }; gatewayCallOpts( - program - .command("gw:call") - .description("Call a Gateway method over WS and print JSON") + gateway + .command("call") + .description("Call a Gateway method and print JSON") .argument( "", "Method name (health/status/system-presence/send/agent)", @@ -357,16 +403,7 @@ Examples: .action(async (method, opts) => { try { const params = JSON.parse(String(opts.params ?? "{}")); - const result = await callGateway({ - url: opts.url, - token: opts.token, - method, - params, - expectFinal: Boolean(opts.expectFinal), - timeoutMs: Number(opts.timeout ?? 10000), - clientName: "cli", - mode: "cli", - }); + const result = await callWithSpawn(method, opts, params); defaultRuntime.log(JSON.stringify(result, null, 2)); } catch (err) { defaultRuntime.error(`Gateway call failed: ${String(err)}`); @@ -376,17 +413,12 @@ Examples: ); gatewayCallOpts( - program - .command("gw:health") - .description("Fetch Gateway health over WS") + gateway + .command("health") + .description("Fetch Gateway health") .action(async (opts) => { try { - const result = await callGateway({ - url: opts.url, - token: opts.token, - method: "health", - timeoutMs: Number(opts.timeout ?? 10000), - }); + const result = await callWithSpawn("health", opts); defaultRuntime.log(JSON.stringify(result, null, 2)); } catch (err) { defaultRuntime.error(String(err)); @@ -396,17 +428,12 @@ Examples: ); gatewayCallOpts( - program - .command("gw:status") - .description("Fetch Gateway status over WS") + gateway + .command("status") + .description("Fetch Gateway status") .action(async (opts) => { try { - const result = await callGateway({ - url: opts.url, - token: opts.token, - method: "status", - timeoutMs: Number(opts.timeout ?? 10000), - }); + const result = await callWithSpawn("status", opts); defaultRuntime.log(JSON.stringify(result, null, 2)); } catch (err) { defaultRuntime.error(String(err)); @@ -416,8 +443,8 @@ Examples: ); gatewayCallOpts( - program - .command("gw:send") + gateway + .command("send") .description("Send a message via the Gateway") .requiredOption("--to ", "Destination (E.164 or jid)") .requiredOption("--message ", "Message text") @@ -426,17 +453,11 @@ Examples: .action(async (opts) => { try { const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey(); - const result = await callGateway({ - url: opts.url, - token: opts.token, - method: "send", - params: { - to: opts.to, - message: opts.message, - mediaUrl: opts.mediaUrl, - idempotencyKey, - }, - timeoutMs: Number(opts.timeout ?? 10000), + const result = await callWithSpawn("send", opts, { + to: opts.to, + message: opts.message, + mediaUrl: opts.mediaUrl, + idempotencyKey, }); defaultRuntime.log(JSON.stringify(result, null, 2)); } catch (err) { @@ -447,8 +468,8 @@ Examples: ); gatewayCallOpts( - program - .command("gw:agent") + gateway + .command("agent") .description("Run an agent turn via the Gateway (waits for final)") .requiredOption("--message ", "User message") .option("--to ", "Destination") @@ -460,11 +481,10 @@ Examples: .action(async (opts) => { try { const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey(); - const result = await callGateway({ - url: opts.url, - token: opts.token, - method: "agent", - params: { + const result = await callWithSpawn( + "agent", + { ...opts, expectFinal: true }, + { message: opts.message, to: opts.to, sessionId: opts.sessionId, @@ -475,9 +495,7 @@ Examples: : undefined, idempotencyKey, }, - expectFinal: true, - timeoutMs: Number(opts.timeout ?? 10000), - }); + ); defaultRuntime.log(JSON.stringify(result, null, 2)); } catch (err) { defaultRuntime.error(String(err)); diff --git a/src/commands/health.ts b/src/commands/health.ts index 2687060e7..74c12f937 100644 --- a/src/commands/health.ts +++ b/src/commands/health.ts @@ -54,7 +54,6 @@ export type HealthSummary = { age: number | null; }>; }; - ipc: { path: string; exists: boolean }; }; const DEFAULT_TIMEOUT_MS = 10_000; @@ -209,9 +208,6 @@ export async function getHealthSnapshot( age: s.updatedAt ? Date.now() - s.updatedAt : null, })); - const ipcPath = path.join(process.env.HOME ?? "", ".clawdis", "clawdis.sock"); - const ipcExists = Boolean(ipcPath) && fs.existsSync(ipcPath); - const start = Date.now(); const cappedTimeout = Math.max(1000, timeoutMs ?? DEFAULT_TIMEOUT_MS); const connect = linked ? await probeWebConnect(cappedTimeout) : undefined; @@ -235,7 +231,6 @@ export async function getHealthSnapshot( count: sessions.length, recent, }, - ipc: { path: ipcPath, exists: ipcExists }, }; return summary; @@ -300,11 +295,6 @@ export async function healthCommand( ); } } - runtime.log( - info( - `IPC socket: ${summary.ipc.exists ? "present" : "missing"} (${summary.ipc.path})`, - ), - ); } if (fatal) { diff --git a/src/commands/send.test.ts b/src/commands/send.test.ts index e7bc3d266..eac42e05f 100644 --- a/src/commands/send.test.ts +++ b/src/commands/send.test.ts @@ -4,9 +4,10 @@ import type { CliDeps } from "../cli/deps.js"; import type { RuntimeEnv } from "../runtime.js"; import { sendCommand } from "./send.js"; -const sendViaIpcMock = vi.fn().mockResolvedValue(null); -vi.mock("../web/ipc.js", () => ({ - sendViaIpc: (...args: unknown[]) => sendViaIpcMock(...args), +const callGatewayMock = vi.fn(); +vi.mock("../gateway/call.js", () => ({ + callGateway: (...args: unknown[]) => callGatewayMock(...args), + randomIdempotencyKey: () => "idem-1", })); const originalTelegramToken = process.env.TELEGRAM_BOT_TOKEN; @@ -48,8 +49,8 @@ describe("sendCommand", () => { expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled(); }); - it("uses IPC when available", async () => { - sendViaIpcMock.mockResolvedValueOnce({ success: true, messageId: "ipc1" }); + it("sends via gateway", async () => { + callGatewayMock.mockResolvedValueOnce({ messageId: "g1" }); const deps = makeDeps(); await sendCommand( { @@ -59,25 +60,8 @@ describe("sendCommand", () => { deps, runtime, ); - expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled(); - expect(runtime.log).toHaveBeenCalledWith(expect.stringContaining("ipc1")); - }); - - it("falls back to direct send when IPC fails", async () => { - sendViaIpcMock.mockResolvedValueOnce({ success: false, error: "nope" }); - const deps = makeDeps({ - sendMessageWhatsApp: vi.fn().mockResolvedValue({ messageId: "direct1" }), - }); - await sendCommand( - { - to: "+1", - message: "hi", - media: "pic.jpg", - }, - deps, - runtime, - ); - expect(deps.sendMessageWhatsApp).toHaveBeenCalled(); + expect(callGatewayMock).toHaveBeenCalled(); + expect(runtime.log).toHaveBeenCalledWith(expect.stringContaining("g1")); }); it("routes to telegram provider", async () => { @@ -100,10 +84,8 @@ describe("sendCommand", () => { }); it("emits json output", async () => { - sendViaIpcMock.mockResolvedValueOnce(null); - const deps = makeDeps({ - sendMessageWhatsApp: vi.fn().mockResolvedValue({ messageId: "direct2" }), - }); + callGatewayMock.mockResolvedValueOnce({ messageId: "direct2" }); + const deps = makeDeps(); await sendCommand( { to: "+1", diff --git a/src/commands/send.ts b/src/commands/send.ts index 153db86ee..a85e89c73 100644 --- a/src/commands/send.ts +++ b/src/commands/send.ts @@ -1,7 +1,8 @@ import type { CliDeps } from "../cli/deps.js"; import { info, success } from "../globals.js"; import type { RuntimeEnv } from "../runtime.js"; -import { sendViaIpc } from "../web/ipc.js"; +import { callGateway, randomIdempotencyKey } from "../gateway/call.js"; +import { startGatewayServer } from "../gateway/server.js"; export async function sendCommand( opts: { @@ -11,6 +12,7 @@ export async function sendCommand( json?: boolean; dryRun?: boolean; media?: string; + spawnGateway?: boolean; }, deps: CliDeps, runtime: RuntimeEnv, @@ -53,56 +55,44 @@ export async function sendCommand( return; } - // Try to send via IPC to running gateway first (avoids Signal session corruption) - const ipcResult = await sendViaIpc(opts.to, opts.message, opts.media); - if (ipcResult) { - if (ipcResult.success) { - runtime.log( - success(`✅ Sent via gateway IPC. Message ID: ${ipcResult.messageId}`), - ); - if (opts.json) { - runtime.log( - JSON.stringify( - { - provider: "web", - via: "ipc", - to: opts.to, - messageId: ipcResult.messageId, - mediaUrl: opts.media ?? null, - }, - null, - 2, - ), - ); - } - return; - } - // IPC failed but gateway is running - warn and fall back - runtime.log( - info( - `IPC send failed (${ipcResult.error}), falling back to direct connection`, - ), - ); + // Always send via gateway over WS to avoid multi-session corruption. + const sendViaGateway = async () => + callGateway<{ + messageId: string; + }>({ + url: "ws://127.0.0.1:18789", + method: "send", + params: { + to: opts.to, + message: opts.message, + mediaUrl: opts.media, + idempotencyKey: randomIdempotencyKey(), + }, + timeoutMs: 10_000, + clientName: "cli", + mode: "cli", + }); + + let result: { messageId: string } | undefined; + try { + result = await sendViaGateway(); + } catch (err) { + if (!opts.spawnGateway) throw err; + await startGatewayServer(18789); + result = await sendViaGateway(); } - // Fall back to direct connection (creates new Baileys socket) - const res = await deps - .sendMessageWhatsApp(opts.to, opts.message, { - verbose: false, - mediaUrl: opts.media, - }) - .catch((err) => { - runtime.error(`❌ Web send failed: ${String(err)}`); - throw err; - }); + runtime.log( + success(`✅ Sent via gateway. Message ID: ${result.messageId ?? "unknown"}`), + ); if (opts.json) { runtime.log( JSON.stringify( { provider: "web", - via: "direct", + via: "gateway", to: opts.to, - messageId: res.messageId, + messageId: result.messageId, mediaUrl: opts.media ?? null, }, null, diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 404ed73a3..b3bd6e576 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -19,7 +19,6 @@ import { getQueueSize } from "../process/command-queue.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { jidToE164, normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; -import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js"; import { loadWebMedia } from "./media.js"; import { sendMessageWhatsApp } from "./outbound.js"; import { @@ -41,23 +40,13 @@ export function setHeartbeatsEnabled(enabled: boolean) { heartbeatsEnabled = enabled; } -/** - * Send a message via IPC if gateway is running, otherwise fall back to direct. - * This avoids Signal session corruption from multiple Baileys connections. - */ +// Send via the active gateway-backed listener. The monitor already owns the single +// Baileys session, so use its send API directly. async function sendWithIpcFallback( to: string, message: string, opts: { verbose: boolean; mediaUrl?: string }, ): Promise<{ messageId: string; toJid: string }> { - const ipcResult = await sendViaIpc(to, message, opts.mediaUrl); - if (ipcResult?.success && ipcResult.messageId) { - if (opts.verbose) { - console.log(info(`Sent via gateway IPC (avoiding session corruption)`)); - } - return { messageId: ipcResult.messageId, toJid: `${to}@s.whatsapp.net` }; - } - // Fall back to direct send return sendMessageWhatsApp(to, message, opts); } @@ -1027,47 +1016,7 @@ export async function monitorWebProvider( `WhatsApp gateway connected${selfE164 ? ` as ${selfE164}` : ""}.`, ); - // Start IPC server so `clawdis send` can use this connection - // instead of creating a new one (which would corrupt Signal session) - if ("sendMessage" in listener && "sendComposingTo" in listener) { - startIpcServer(async (to, message, mediaUrl) => { - let mediaBuffer: Buffer | undefined; - let mediaType: string | undefined; - if (mediaUrl) { - const media = await loadWebMedia(mediaUrl); - mediaBuffer = media.buffer; - mediaType = media.contentType; - } - const result = await listener.sendMessage( - to, - message, - mediaBuffer, - mediaType, - ); - // Add to echo detection so we don't process our own message - if (message) { - recentlySent.add(message); - if (recentlySent.size > MAX_RECENT_MESSAGES) { - const firstKey = recentlySent.values().next().value; - if (firstKey) recentlySent.delete(firstKey); - } - } - logInfo( - `📤 IPC send to ${to}: ${message.substring(0, 50)}...`, - runtime, - ); - // Show typing indicator after send so user knows more may be coming - try { - await listener.sendComposingTo(to); - } catch { - // Ignore typing indicator errors - not critical - } - return result; - }); - } - const closeListener = async () => { - stopIpcServer(); if (heartbeat) clearInterval(heartbeat); if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer); if (watchdogTimer) clearInterval(watchdogTimer); diff --git a/src/web/ipc.test.ts b/src/web/ipc.test.ts deleted file mode 100644 index 7409f2a17..000000000 --- a/src/web/ipc.test.ts +++ /dev/null @@ -1,65 +0,0 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; - -import { afterEach, describe, expect, it, vi } from "vitest"; - -vi.mock("../logging.js", () => ({ - getChildLogger: () => ({ - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }), -})); - -const originalHome = process.env.HOME; - -afterEach(() => { - process.env.HOME = originalHome; - vi.resetModules(); -}); - -describe("ipc hardening", () => { - it("creates private socket dir and socket with tight perms", async () => { - const tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), "clawdis-home-")); - const clawdisDir = path.join(tmpHome, ".clawdis"); - fs.mkdirSync(clawdisDir, { recursive: true }); - process.env.HOME = tmpHome; - vi.resetModules(); - - const ipc = await import("./ipc.js"); - - const sendHandler = vi.fn().mockResolvedValue({ messageId: "msg1" }); - ipc.startIpcServer(sendHandler); - - const dirStat = fs.lstatSync(path.join(tmpHome, ".clawdis", "ipc")); - expect(dirStat.mode & 0o777).toBe(0o700); - - expect(ipc.isRelayRunning()).toBe(true); - - const socketStat = fs.lstatSync(ipc.getSocketPath()); - expect(socketStat.isSocket()).toBe(true); - if (typeof process.getuid === "function") { - expect(socketStat.uid).toBe(process.getuid()); - } - - ipc.stopIpcServer(); - expect(ipc.isRelayRunning()).toBe(false); - }); - - it("refuses to start when IPC dir is a symlink", async () => { - const tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), "clawdis-home-")); - const clawdisDir = path.join(tmpHome, ".clawdis"); - fs.mkdirSync(clawdisDir, { recursive: true }); - fs.symlinkSync("/tmp", path.join(clawdisDir, "ipc")); - - process.env.HOME = tmpHome; - vi.resetModules(); - - const ipc = await import("./ipc.js"); - const sendHandler = vi.fn().mockResolvedValue({ messageId: "msg1" }); - - expect(() => ipc.startIpcServer(sendHandler)).toThrow(/symlink/i); - }); -}); diff --git a/src/web/ipc.ts b/src/web/ipc.ts deleted file mode 100644 index 7e95b4e70..000000000 --- a/src/web/ipc.ts +++ /dev/null @@ -1,277 +0,0 @@ -/** - * IPC server for clawdis gateway. - * - * When the gateway is running, it starts a Unix socket server that allows - * `clawdis send` and `clawdis heartbeat` to send messages through the - * existing WhatsApp connection instead of creating new ones. - * - * This prevents Signal session ratchet corruption from multiple connections. - */ - -import fs from "node:fs"; -import net from "node:net"; -import path from "node:path"; - -import { getChildLogger } from "../logging.js"; -import { CONFIG_DIR } from "../utils.js"; - -const SOCKET_DIR = path.join(CONFIG_DIR, "ipc"); -const SOCKET_PATH = path.join(SOCKET_DIR, "gateway.sock"); - -export interface IpcSendRequest { - type: "send"; - to: string; - message: string; - mediaUrl?: string; -} - -export interface IpcSendResponse { - success: boolean; - messageId?: string; - error?: string; -} - -type SendHandler = ( - to: string, - message: string, - mediaUrl?: string, -) => Promise<{ messageId: string }>; - -let server: net.Server | null = null; - -/** - * Start the IPC server. Called by the gateway when it starts. - */ -export function startIpcServer(sendHandler: SendHandler): void { - const logger = getChildLogger({ module: "ipc-server" }); - - ensureSocketDir(); - try { - assertSafeSocketPath(SOCKET_PATH); - } catch (err) { - logger.error({ error: String(err) }, "Refusing to start IPC server"); - throw err; - } - - // Clean up stale socket file (only if safe to do so) - try { - fs.unlinkSync(SOCKET_PATH); - } catch (err) { - if ((err as NodeJS.ErrnoException).code !== "ENOENT") { - throw err; - } - } - - server = net.createServer((conn) => { - let buffer = ""; - - conn.on("data", async (data) => { - buffer += data.toString(); - - // Try to parse complete JSON messages (newline-delimited) - const lines = buffer.split("\n"); - buffer = lines.pop() ?? ""; // Keep incomplete line in buffer - - for (const line of lines) { - if (!line.trim()) continue; - - try { - const request = JSON.parse(line) as IpcSendRequest; - - if (request.type === "send") { - try { - const result = await sendHandler( - request.to, - request.message, - request.mediaUrl, - ); - const response: IpcSendResponse = { - success: true, - messageId: result.messageId, - }; - conn.write(`${JSON.stringify(response)}\n`); - } catch (err) { - const response: IpcSendResponse = { - success: false, - error: String(err), - }; - conn.write(`${JSON.stringify(response)}\n`); - } - } - } catch (err) { - logger.warn({ error: String(err) }, "failed to parse IPC request"); - const response: IpcSendResponse = { - success: false, - error: "Invalid request format", - }; - conn.write(`${JSON.stringify(response)}\n`); - } - } - }); - - conn.on("error", (err) => { - logger.debug({ error: String(err) }, "IPC connection error"); - }); - }); - - server.listen(SOCKET_PATH, () => { - logger.info({ socketPath: SOCKET_PATH }, "IPC server started"); - // Make socket accessible - fs.chmodSync(SOCKET_PATH, 0o600); - }); - - server.on("error", (err) => { - logger.error({ error: String(err) }, "IPC server error"); - }); -} - -/** - * Stop the IPC server. Called when gateway shuts down. - */ -export function stopIpcServer(): void { - if (server) { - server.close(); - server = null; - } - try { - fs.unlinkSync(SOCKET_PATH); - } catch { - // Ignore - } -} - -/** - * Check if the gateway IPC server is running. - */ -export function isRelayRunning(): boolean { - try { - assertSafeSocketPath(SOCKET_PATH); - fs.accessSync(SOCKET_PATH); - return true; - } catch { - return false; - } -} - -/** - * Send a message through the running gateway's IPC. - * Returns null if gateway is not running. - */ -export async function sendViaIpc( - to: string, - message: string, - mediaUrl?: string, -): Promise { - if (!isRelayRunning()) { - return null; - } - - return new Promise((resolve) => { - const client = net.createConnection(SOCKET_PATH); - let buffer = ""; - let resolved = false; - - const timeout = setTimeout(() => { - if (!resolved) { - resolved = true; - client.destroy(); - resolve({ success: false, error: "IPC timeout" }); - } - }, 30000); // 30 second timeout - - client.on("connect", () => { - const request: IpcSendRequest = { - type: "send", - to, - message, - mediaUrl, - }; - client.write(`${JSON.stringify(request)}\n`); - }); - - client.on("data", (data) => { - buffer += data.toString(); - const lines = buffer.split("\n"); - - for (const line of lines) { - if (!line.trim()) continue; - try { - const response = JSON.parse(line) as IpcSendResponse; - if (!resolved) { - resolved = true; - clearTimeout(timeout); - client.end(); - resolve(response); - } - return; - } catch { - // Keep reading - } - } - }); - - client.on("error", (_err) => { - if (!resolved) { - resolved = true; - clearTimeout(timeout); - // Socket exists but can't connect - gateway might have crashed - resolve(null); - } - }); - - client.on("close", () => { - if (!resolved) { - resolved = true; - clearTimeout(timeout); - resolve({ success: false, error: "Connection closed" }); - } - }); - }); -} - -/** - * Get the IPC socket path for debugging/status. - */ -export function getSocketPath(): string { - return SOCKET_PATH; -} - -function ensureSocketDir(): void { - try { - const stat = fs.lstatSync(SOCKET_DIR); - if (stat.isSymbolicLink()) { - throw new Error(`IPC dir is a symlink: ${SOCKET_DIR}`); - } - if (!stat.isDirectory()) { - throw new Error(`IPC dir is not a directory: ${SOCKET_DIR}`); - } - // Enforce private permissions - fs.chmodSync(SOCKET_DIR, 0o700); - if (typeof process.getuid === "function" && stat.uid !== process.getuid()) { - throw new Error(`IPC dir owned by different user: ${SOCKET_DIR}`); - } - } catch (err) { - if ((err as NodeJS.ErrnoException).code === "ENOENT") { - fs.mkdirSync(SOCKET_DIR, { recursive: true, mode: 0o700 }); - return; - } - throw err; - } -} - -function assertSafeSocketPath(socketPath: string): void { - try { - const stat = fs.lstatSync(socketPath); - if (stat.isSymbolicLink()) { - throw new Error(`Refusing IPC socket symlink: ${socketPath}`); - } - if (typeof process.getuid === "function" && stat.uid !== process.getuid()) { - throw new Error(`IPC socket owned by different user: ${socketPath}`); - } - } catch (err) { - if ((err as NodeJS.ErrnoException).code === "ENOENT") { - return; // Missing is fine; creation will happen next. - } - throw err; - } -}