diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 680e276d1..449d50917 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -19,6 +19,8 @@ vi.mock("../commands/agent.js", () => ({ agentCommand: vi.fn().mockResolvedValue(undefined), })); +process.env.CLAWDIS_SKIP_PROVIDERS = "1"; + async function getFreePort(): Promise { return await new Promise((resolve, reject) => { const server = createServer(); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index c3667e66e..48785c010 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -5,6 +5,8 @@ import { createDefaultDeps } from "../cli/deps.js"; import { agentCommand } from "../commands/agent.js"; import { getHealthSnapshot } from "../commands/health.js"; import { getStatusSummary } from "../commands/status.js"; +import { loadConfig } from "../config/config.js"; +import { isVerbose } from "../globals.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { @@ -13,9 +15,11 @@ import { } from "../infra/system-presence.js"; import { logError } from "../logger.js"; import { getResolvedLoggerSettings } from "../logging.js"; -import { isVerbose } from "../globals.js"; import { defaultRuntime } from "../runtime.js"; +import { monitorWebProvider, webAuthExists } from "../providers/web/index.js"; import { sendMessageWhatsApp } from "../web/outbound.js"; +import { monitorTelegramProvider } from "../telegram/monitor.js"; +import { sendMessageTelegram } from "../telegram/send.js"; import { ErrorCodes, type ErrorShape, @@ -94,8 +98,53 @@ export async function startGatewayServer(port = 18789): Promise { host: "127.0.0.1", maxPayload: MAX_PAYLOAD_BYTES, }); + const providerAbort = new AbortController(); + const providerTasks: Array> = []; const clients = new Set(); + const startProviders = async () => { + const cfg = loadConfig(); + const telegramToken = + process.env.TELEGRAM_BOT_TOKEN ?? cfg.telegram?.botToken ?? ""; + + if (await webAuthExists()) { + defaultRuntime.log("gateway: starting WhatsApp Web provider"); + providerTasks.push( + monitorWebProvider( + isVerbose(), + undefined, + true, + undefined, + defaultRuntime, + providerAbort.signal, + ).catch((err) => logError(`web provider exited: ${String(err)}`)), + ); + } else { + defaultRuntime.log( + "gateway: skipping WhatsApp Web provider (no linked session)", + ); + } + + if (telegramToken.trim().length > 0) { + defaultRuntime.log("gateway: starting Telegram provider"); + providerTasks.push( + monitorTelegramProvider({ + token: telegramToken.trim(), + runtime: defaultRuntime, + abortSignal: providerAbort.signal, + useWebhook: Boolean(cfg.telegram?.webhookUrl), + webhookUrl: cfg.telegram?.webhookUrl, + webhookSecret: cfg.telegram?.webhookSecret, + webhookPath: cfg.telegram?.webhookPath, + }).catch((err) => logError(`telegram provider exited: ${String(err)}`)), + ); + } else { + defaultRuntime.log( + "gateway: skipping Telegram provider (no TELEGRAM_BOT_TOKEN/config)", + ); + } + }; + const broadcast = ( event: string, payload: unknown, @@ -399,16 +448,32 @@ export async function startGatewayServer(port = 18789): Promise { } const to = params.to.trim(); const message = params.message.trim(); + const provider = (params.provider ?? "whatsapp").toLowerCase(); try { - const result = await sendMessageWhatsApp(to, message, { - mediaUrl: params.mediaUrl, - verbose: isVerbose(), - }); - const payload = { - runId: idem, - messageId: result.messageId, - toJid: result.toJid ?? `${to}@s.whatsapp.net`, - }; + const result = + provider === "telegram" + ? await sendMessageTelegram(to, message, { + mediaUrl: params.mediaUrl, + verbose: isVerbose(), + }) + : await sendMessageWhatsApp(to, message, { + mediaUrl: params.mediaUrl, + verbose: isVerbose(), + }); + const payload = + provider === "telegram" + ? { + runId: idem, + messageId: result.messageId, + chatId: result.chatId, + provider, + } + : { + runId: idem, + messageId: result.messageId, + toJid: result.toJid ?? `${to}@s.whatsapp.net`, + provider, + }; dedupe.set(`send:${idem}`, { ts: Date.now(), ok: true, payload }); respond(true, payload, undefined); } catch (err) { @@ -524,8 +589,17 @@ export async function startGatewayServer(port = 18789): Promise { ); defaultRuntime.log(`gateway log file: ${getResolvedLoggerSettings().file}`); + // Launch configured providers (WhatsApp Web, Telegram) so gateway replies via the + // surface the message came from. Tests can opt out via CLAWDIS_SKIP_PROVIDERS. + if (process.env.CLAWDIS_SKIP_PROVIDERS !== "1") { + void startProviders(); + } else { + defaultRuntime.log("gateway: skipping provider start (CLAWDIS_SKIP_PROVIDERS=1)"); + } + return { close: async () => { + providerAbort.abort(); broadcast("shutdown", { reason: "gateway stopping", restartExpectedMs: null, @@ -547,6 +621,7 @@ export async function startGatewayServer(port = 18789): Promise { } } clients.clear(); + await Promise.allSettled(providerTasks); await new Promise((resolve) => wss.close(() => resolve())); }, };