feat: add Signal provider support

This commit is contained in:
Peter Steinberger
2026-01-01 15:43:15 +01:00
parent 0a4c2f91f5
commit 596770942a
21 changed files with 1368 additions and 19 deletions

View File

@@ -1871,6 +1871,61 @@ describe("gateway server", () => {
await server.close();
});
test("agent routes main last-channel signal", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
testSessionStorePath = path.join(dir, "sessions.json");
await fs.writeFile(
testSessionStorePath,
JSON.stringify(
{
main: {
sessionId: "sess-signal",
updatedAt: Date.now(),
lastChannel: "signal",
lastTo: "+15551234567",
},
},
null,
2,
),
"utf-8",
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
ws.send(
JSON.stringify({
type: "req",
id: "agent-last-signal",
method: "agent",
params: {
message: "hi",
sessionKey: "main",
channel: "last",
deliver: true,
idempotencyKey: "idem-agent-last-signal",
},
}),
);
await onceMessage(
ws,
(o) => o.type === "res" && o.id === "agent-last-signal",
);
const spy = vi.mocked(agentCommand);
expect(spy).toHaveBeenCalled();
const call = spy.mock.calls.at(-1)?.[0] as Record<string, unknown>;
expect(call.provider).toBe("signal");
expect(call.to).toBe("+15551234567");
expect(call.deliver).toBe(true);
expect(call.bestEffortDeliver).toBe(true);
expect(call.sessionId).toBe("sess-signal");
ws.close();
await server.close();
});
test("agent ignores webchat last-channel for routing", async () => {
testAllowFrom = ["+1555"];
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
@@ -2134,6 +2189,11 @@ describe("gateway server", () => {
probe?: unknown;
lastProbeAt?: unknown;
};
signal?: {
configured?: boolean;
probe?: unknown;
lastProbeAt?: unknown;
};
}>(ws, "providers.status", { probe: false, timeoutMs: 2000 });
expect(res.ok).toBe(true);
expect(res.payload?.whatsapp).toBeTruthy();
@@ -2141,6 +2201,9 @@ describe("gateway server", () => {
expect(res.payload?.telegram?.tokenSource).toBe("none");
expect(res.payload?.telegram?.probe).toBeUndefined();
expect(res.payload?.telegram?.lastProbeAt).toBeNull();
expect(res.payload?.signal?.configured).toBe(false);
expect(res.payload?.signal?.probe).toBeUndefined();
expect(res.payload?.signal?.lastProbeAt).toBeNull();
ws.close();
await server.close();

View File

@@ -74,6 +74,8 @@ import {
sendMessageDiscord,
} from "../discord/index.js";
import { type DiscordProbe, probeDiscord } from "../discord/probe.js";
import { monitorSignalProvider, sendMessageSignal } from "../signal/index.js";
import { type SignalProbe, probeSignal } from "../signal/probe.js";
import { isVerbose } from "../globals.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { startGatewayBonjourAdvertiser } from "../infra/bonjour.js";
@@ -283,10 +285,12 @@ const logWsControl = log.child("ws");
const logWhatsApp = logProviders.child("whatsapp");
const logTelegram = logProviders.child("telegram");
const logDiscord = logProviders.child("discord");
const logSignal = logProviders.child("signal");
const canvasRuntime = runtimeForLogger(logCanvas);
const whatsappRuntimeEnv = runtimeForLogger(logWhatsApp);
const telegramRuntimeEnv = runtimeForLogger(logTelegram);
const discordRuntimeEnv = runtimeForLogger(logDiscord);
const signalRuntimeEnv = runtimeForLogger(logSignal);
function loadTelegramToken(
config: ClawdisConfig,
@@ -1367,7 +1371,7 @@ export async function startGatewayServer(
wakeMode: "now" | "next-heartbeat";
sessionKey: string;
deliver: boolean;
channel: "last" | "whatsapp" | "telegram" | "discord";
channel: "last" | "whatsapp" | "telegram" | "discord" | "signal";
to?: string;
thinking?: string;
timeoutSeconds?: number;
@@ -1392,6 +1396,7 @@ export async function startGatewayServer(
channelRaw === "whatsapp" ||
channelRaw === "telegram" ||
channelRaw === "discord" ||
channelRaw === "signal" ||
channelRaw === "last"
? channelRaw
: channelRaw === undefined
@@ -1400,7 +1405,7 @@ export async function startGatewayServer(
if (channel === null) {
return {
ok: false,
error: "channel must be last|whatsapp|telegram|discord",
error: "channel must be last|whatsapp|telegram|discord|signal",
};
}
const toRaw = payload.to;
@@ -1451,7 +1456,7 @@ export async function startGatewayServer(
wakeMode: "now" | "next-heartbeat";
sessionKey: string;
deliver: boolean;
channel: "last" | "whatsapp" | "telegram" | "discord";
channel: "last" | "whatsapp" | "telegram" | "discord" | "signal";
to?: string;
thinking?: string;
timeoutSeconds?: number;
@@ -1734,9 +1739,11 @@ export async function startGatewayServer(
let whatsappAbort: AbortController | null = null;
let telegramAbort: AbortController | null = null;
let discordAbort: AbortController | null = null;
let signalAbort: AbortController | null = null;
let whatsappTask: Promise<unknown> | null = null;
let telegramTask: Promise<unknown> | null = null;
let discordTask: Promise<unknown> | null = null;
let signalTask: Promise<unknown> | null = null;
let whatsappRuntime: WebProviderStatus = {
running: false,
connected: false,
@@ -1771,6 +1778,19 @@ export async function startGatewayServer(
lastStopAt: null,
lastError: null,
};
let signalRuntime: {
running: boolean;
lastStartAt?: number | null;
lastStopAt?: number | null;
lastError?: string | null;
baseUrl?: string | null;
} = {
running: false,
lastStartAt: null,
lastStopAt: null,
lastError: null,
baseUrl: null,
};
const clients = new Set<Client>();
let seq = 0;
// Track per-run sequence to detect out-of-order/lost agent events.
@@ -2102,10 +2122,96 @@ export async function startGatewayServer(
};
};
const startSignalProvider = async () => {
if (signalTask) return;
const cfg = loadConfig();
if (!cfg.signal) {
signalRuntime = {
...signalRuntime,
running: false,
lastError: "not configured",
};
logSignal.info("skipping provider start (signal not configured)");
return;
}
if (cfg.signal?.enabled === false) {
signalRuntime = {
...signalRuntime,
running: false,
lastError: "disabled",
};
logSignal.info("skipping provider start (signal.enabled=false)");
return;
}
const host = cfg.signal?.httpHost?.trim() || "127.0.0.1";
const port = cfg.signal?.httpPort ?? 8080;
const baseUrl = cfg.signal?.httpUrl?.trim() || `http://${host}:${port}`;
logSignal.info(`starting provider (${baseUrl})`);
signalAbort = new AbortController();
signalRuntime = {
...signalRuntime,
running: true,
lastStartAt: Date.now(),
lastError: null,
baseUrl,
};
const task = monitorSignalProvider({
baseUrl,
account: cfg.signal?.account,
cliPath: cfg.signal?.cliPath,
httpHost: cfg.signal?.httpHost,
httpPort: cfg.signal?.httpPort,
autoStart: cfg.signal?.autoStart,
receiveMode: cfg.signal?.receiveMode,
ignoreAttachments: cfg.signal?.ignoreAttachments,
ignoreStories: cfg.signal?.ignoreStories,
sendReadReceipts: cfg.signal?.sendReadReceipts,
allowFrom: cfg.signal?.allowFrom,
mediaMaxMb: cfg.signal?.mediaMaxMb,
runtime: signalRuntimeEnv,
abortSignal: signalAbort.signal,
})
.catch((err) => {
signalRuntime = {
...signalRuntime,
lastError: formatError(err),
};
logSignal.error(`provider exited: ${formatError(err)}`);
})
.finally(() => {
signalAbort = null;
signalTask = null;
signalRuntime = {
...signalRuntime,
running: false,
lastStopAt: Date.now(),
};
});
signalTask = task;
};
const stopSignalProvider = async () => {
if (!signalAbort && !signalTask) return;
signalAbort?.abort();
try {
await signalTask;
} catch {
// ignore
}
signalAbort = null;
signalTask = null;
signalRuntime = {
...signalRuntime,
running: false,
lastStopAt: Date.now(),
};
};
const startProviders = async () => {
await startWhatsAppProvider();
await startDiscordProvider();
await startTelegramProvider();
await startSignalProvider();
};
const broadcast = (
@@ -3156,7 +3262,9 @@ export async function startGatewayServer(
typeof link?.channel === "string" ? link.channel.trim() : "";
const channel = channelRaw.toLowerCase();
const provider =
channel === "whatsapp" || channel === "telegram"
channel === "whatsapp" ||
channel === "telegram" ||
channel === "signal"
? channel
: undefined;
const to =
@@ -3984,6 +4092,20 @@ export async function startGatewayServer(
discordLastProbeAt = Date.now();
}
const signalCfg = cfg.signal;
const signalEnabled = signalCfg?.enabled !== false;
const signalHost = signalCfg?.httpHost?.trim() || "127.0.0.1";
const signalPort = signalCfg?.httpPort ?? 8080;
const signalBaseUrl =
signalCfg?.httpUrl?.trim() || `http://${signalHost}:${signalPort}`;
const signalConfigured = Boolean(signalCfg) && signalEnabled;
let signalProbe: SignalProbe | undefined;
let signalLastProbeAt: number | null = null;
if (probe && signalConfigured) {
signalProbe = await probeSignal(signalBaseUrl, timeoutMs);
signalLastProbeAt = Date.now();
}
const linked = await webAuthExists();
const authAgeMs = getWebAuthAgeMs();
const self = readWebSelfId();
@@ -4027,6 +4149,16 @@ export async function startGatewayServer(
probe: discordProbe,
lastProbeAt: discordLastProbeAt,
},
signal: {
configured: signalConfigured,
baseUrl: signalBaseUrl,
running: signalRuntime.running,
lastStartAt: signalRuntime.lastStartAt ?? null,
lastStopAt: signalRuntime.lastStopAt ?? null,
lastError: signalRuntime.lastError ?? null,
probe: signalProbe,
lastProbeAt: signalLastProbeAt,
},
},
undefined,
);
@@ -5925,6 +6057,28 @@ export async function startGatewayServer(
payload,
});
respond(true, payload, undefined, { provider });
} else if (provider === "signal") {
const cfg = loadConfig();
const host = cfg.signal?.httpHost?.trim() || "127.0.0.1";
const port = cfg.signal?.httpPort ?? 8080;
const baseUrl =
cfg.signal?.httpUrl?.trim() || `http://${host}:${port}`;
const result = await sendMessageSignal(to, message, {
mediaUrl: params.mediaUrl,
baseUrl,
account: cfg.signal?.account,
});
const payload = {
runId: idem,
messageId: result.messageId,
provider,
};
dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { provider });
} else {
const result = await sendMessageWhatsApp(to, message, {
mediaUrl: params.mediaUrl,
@@ -6061,6 +6215,7 @@ export async function startGatewayServer(
requestedChannel === "whatsapp" ||
requestedChannel === "telegram" ||
requestedChannel === "discord" ||
requestedChannel === "signal" ||
requestedChannel === "webchat"
) {
return requestedChannel;
@@ -6079,7 +6234,8 @@ export async function startGatewayServer(
if (
resolvedChannel === "whatsapp" ||
resolvedChannel === "telegram" ||
resolvedChannel === "discord"
resolvedChannel === "discord" ||
resolvedChannel === "signal"
) {
return lastTo || undefined;
}
@@ -6324,6 +6480,7 @@ export async function startGatewayServer(
await stopWhatsAppProvider();
await stopTelegramProvider();
await stopDiscordProvider();
await stopSignalProvider();
cron.stop();
heartbeatRunner.stop();
broadcast("shutdown", {
@@ -6361,7 +6518,9 @@ export async function startGatewayServer(
await stopBrowserControlServerIfStarted().catch(() => {});
}
await Promise.allSettled(
[whatsappTask, telegramTask].filter(Boolean) as Array<Promise<unknown>>,
[whatsappTask, telegramTask, signalTask].filter(
Boolean,
) as Array<Promise<unknown>>,
);
await new Promise<void>((resolve) => wss.close(() => resolve()));
await new Promise<void>((resolve, reject) =>