diff --git a/docs/refactor/send-refactor.md b/docs/refactor/send-refactor.md new file mode 100644 index 000000000..6f92a4659 --- /dev/null +++ b/docs/refactor/send-refactor.md @@ -0,0 +1,7 @@ +# send-refactor scratchpad + +- [ ] Commit + push current outbound refactor changes +- [ ] Step 1: centralize outbound target validation +- [ ] Step 2: normalize payloads + single delivery call +- [ ] Step 3: unify outbound JSON/result formatting +- [ ] Cleanup: delete scratchpad, final lint + tests, commit + push diff --git a/src/commands/agent.ts b/src/commands/agent.ts index bd263c3ad..3be991d56 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -22,11 +22,6 @@ import { DEFAULT_AGENT_WORKSPACE_DIR, ensureAgentWorkspace, } from "../agents/workspace.js"; -import { - chunkMarkdownText, - chunkText, - resolveTextChunkLimit, -} from "../auto-reply/chunk.js"; import type { MsgContext } from "../auto-reply/templating.js"; import { normalizeThinkLevel, @@ -49,9 +44,9 @@ import { emitAgentEvent, registerAgentRunContext, } from "../infra/agent-events.js"; +import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { resolveSendPolicy } from "../sessions/send-policy.js"; -import { resolveTelegramToken } from "../telegram/token.js"; import { normalizeE164 } from "../utils.js"; type AgentCommandOpts = { @@ -260,8 +255,6 @@ export async function agentCommand( ? buildWorkspaceSkillSnapshot(workspaceDir, { config: cfg }) : sessionEntry?.skillsSnapshot; - const { token: telegramToken } = resolveTelegramToken(cfg); - if (skillsSnapshot && sessionStore && sessionKey && needsSkillsSnapshot) { const current = sessionEntry ?? { sessionId, @@ -621,16 +614,6 @@ export async function agentCommand( return { payloads: [], meta: result.meta }; } - const deliveryTextLimit = - deliveryProvider === "whatsapp" || - deliveryProvider === "telegram" || - deliveryProvider === "discord" || - deliveryProvider === "slack" || - deliveryProvider === "signal" || - deliveryProvider === "imessage" - ? resolveTextChunkLimit(cfg, deliveryProvider) - : resolveTextChunkLimit(cfg, "whatsapp"); - for (const payload of payloads) { const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); @@ -648,154 +631,42 @@ export async function agentCommand( const media = mediaList; if (!text && media.length === 0) continue; - if (deliveryProvider === "whatsapp" && whatsappTarget) { + if ( + deliveryProvider === "whatsapp" || + deliveryProvider === "telegram" || + deliveryProvider === "discord" || + deliveryProvider === "slack" || + deliveryProvider === "signal" || + deliveryProvider === "imessage" + ) { + const target = + deliveryProvider === "whatsapp" + ? whatsappTarget + : deliveryProvider === "telegram" + ? telegramTarget + : deliveryProvider === "discord" + ? discordTarget + : deliveryProvider === "slack" + ? slackTarget + : deliveryProvider === "signal" + ? signalTarget + : imessageTarget; + if (!target) continue; try { - const primaryMedia = media[0]; - await deps.sendMessageWhatsApp(whatsappTarget, text, { - verbose: false, - mediaUrl: primaryMedia, + await deliverOutboundPayloads({ + cfg, + provider: deliveryProvider, + to: target, + payloads: [payload], + deps: { + sendWhatsApp: deps.sendMessageWhatsApp, + sendTelegram: deps.sendMessageTelegram, + sendDiscord: deps.sendMessageDiscord, + sendSlack: deps.sendMessageSlack, + sendSignal: deps.sendMessageSignal, + sendIMessage: deps.sendMessageIMessage, + }, }); - for (const extra of media.slice(1)) { - await deps.sendMessageWhatsApp(whatsappTarget, "", { - verbose: false, - mediaUrl: extra, - }); - } - } catch (err) { - if (!bestEffortDeliver) throw err; - logDeliveryError(err); - } - continue; - } - - if (deliveryProvider === "telegram" && telegramTarget) { - try { - if (media.length === 0) { - for (const chunk of chunkMarkdownText(text, deliveryTextLimit)) { - await deps.sendMessageTelegram(telegramTarget, chunk, { - verbose: false, - token: telegramToken || undefined, - }); - } - } else { - let first = true; - for (const url of media) { - const caption = first ? text : ""; - first = false; - await deps.sendMessageTelegram(telegramTarget, caption, { - verbose: false, - mediaUrl: url, - token: telegramToken || undefined, - }); - } - } - } catch (err) { - if (!bestEffortDeliver) throw err; - logDeliveryError(err); - } - } - - if (deliveryProvider === "discord" && discordTarget) { - try { - if (media.length === 0) { - await deps.sendMessageDiscord(discordTarget, text, { - token: process.env.DISCORD_BOT_TOKEN, - }); - } else { - let first = true; - for (const url of media) { - const caption = first ? text : ""; - first = false; - await deps.sendMessageDiscord(discordTarget, caption, { - token: process.env.DISCORD_BOT_TOKEN, - mediaUrl: url, - }); - } - } - } catch (err) { - if (!bestEffortDeliver) throw err; - logDeliveryError(err); - } - } - - if (deliveryProvider === "slack" && slackTarget) { - try { - if (media.length === 0) { - await deps.sendMessageSlack(slackTarget, text); - } else { - let first = true; - for (const url of media) { - const caption = first ? text : ""; - first = false; - await deps.sendMessageSlack(slackTarget, caption, { - mediaUrl: url, - }); - } - } - } catch (err) { - if (!bestEffortDeliver) throw err; - logDeliveryError(err); - } - } - - if (deliveryProvider === "signal" && signalTarget) { - try { - if (media.length === 0) { - await deps.sendMessageSignal(signalTarget, text, { - maxBytes: cfg.signal?.mediaMaxMb - ? cfg.signal.mediaMaxMb * 1024 * 1024 - : cfg.agent?.mediaMaxMb - ? cfg.agent.mediaMaxMb * 1024 * 1024 - : undefined, - }); - } else { - let first = true; - for (const url of media) { - const caption = first ? text : ""; - first = false; - await deps.sendMessageSignal(signalTarget, caption, { - mediaUrl: url, - maxBytes: cfg.signal?.mediaMaxMb - ? cfg.signal.mediaMaxMb * 1024 * 1024 - : cfg.agent?.mediaMaxMb - ? cfg.agent.mediaMaxMb * 1024 * 1024 - : undefined, - }); - } - } - } catch (err) { - if (!bestEffortDeliver) throw err; - logDeliveryError(err); - } - } - - if (deliveryProvider === "imessage" && imessageTarget) { - try { - if (media.length === 0) { - for (const chunk of chunkText(text, deliveryTextLimit)) { - await deps.sendMessageIMessage(imessageTarget, chunk, { - maxBytes: cfg.imessage?.mediaMaxMb - ? cfg.imessage.mediaMaxMb * 1024 * 1024 - : cfg.agent?.mediaMaxMb - ? cfg.agent.mediaMaxMb * 1024 * 1024 - : undefined, - }); - } - } else { - let first = true; - for (const url of media) { - const caption = first ? text : ""; - first = false; - await deps.sendMessageIMessage(imessageTarget, caption, { - mediaUrl: url, - maxBytes: cfg.imessage?.mediaMaxMb - ? cfg.imessage.mediaMaxMb * 1024 * 1024 - : cfg.agent?.mediaMaxMb - ? cfg.agent.mediaMaxMb * 1024 * 1024 - : undefined, - }); - } - } } catch (err) { if (!bestEffortDeliver) throw err; logDeliveryError(err); diff --git a/src/commands/send.test.ts b/src/commands/send.test.ts index d42557b87..698fddd6b 100644 --- a/src/commands/send.test.ts +++ b/src/commands/send.test.ts @@ -137,7 +137,7 @@ describe("sendCommand", () => { expect(deps.sendMessageTelegram).toHaveBeenCalledWith( "123", "hi", - expect.objectContaining({ token: "token-abc" }), + expect.objectContaining({ token: "token-abc", verbose: false }), ); expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled(); }); @@ -158,7 +158,7 @@ describe("sendCommand", () => { expect(deps.sendMessageTelegram).toHaveBeenCalledWith( "123", "hi", - expect.objectContaining({ token: "cfg-token" }), + expect.objectContaining({ token: "cfg-token", verbose: false }), ); }); @@ -176,7 +176,7 @@ describe("sendCommand", () => { expect(deps.sendMessageDiscord).toHaveBeenCalledWith( "channel:chan", "hi", - expect.objectContaining({ token: "token-discord" }), + expect.objectContaining({ verbose: false }), ); expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled(); }); @@ -193,7 +193,7 @@ describe("sendCommand", () => { expect(deps.sendMessageSignal).toHaveBeenCalledWith( "+15551234567", "hi", - expect.objectContaining({ mediaUrl: undefined }), + expect.objectContaining({ maxBytes: undefined }), ); expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled(); }); @@ -209,11 +209,7 @@ describe("sendCommand", () => { deps, runtime, ); - expect(deps.sendMessageSlack).toHaveBeenCalledWith( - "channel:C123", - "hi", - expect.objectContaining({ mediaUrl: undefined }), - ); + expect(deps.sendMessageSlack).toHaveBeenCalledWith("channel:C123", "hi"); expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled(); }); @@ -229,7 +225,7 @@ describe("sendCommand", () => { expect(deps.sendMessageIMessage).toHaveBeenCalledWith( "chat_id:42", "hi", - expect.objectContaining({ mediaUrl: undefined }), + expect.objectContaining({ maxBytes: undefined }), ); expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled(); }); diff --git a/src/commands/send.ts b/src/commands/send.ts index 8d2e4e7c8..804f5263d 100644 --- a/src/commands/send.ts +++ b/src/commands/send.ts @@ -2,8 +2,9 @@ import type { CliDeps } from "../cli/deps.js"; import { loadConfig } from "../config/config.js"; import { callGateway, randomIdempotencyKey } from "../gateway/call.js"; import { success } from "../globals.js"; +import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; +import type { OutboundDeliveryResult } from "../infra/outbound/deliver.js"; import type { RuntimeEnv } from "../runtime.js"; -import { resolveTelegramToken } from "../telegram/token.js"; export async function sendCommand( opts: { @@ -19,7 +20,8 @@ export async function sendCommand( deps: CliDeps, runtime: RuntimeEnv, ) { - const provider = (opts.provider ?? "whatsapp").toLowerCase(); + const providerRaw = (opts.provider ?? "whatsapp").toLowerCase(); + const provider = providerRaw === "imsg" ? "imessage" : providerRaw; if (opts.dryRun) { runtime.log( @@ -28,131 +30,41 @@ export async function sendCommand( return; } - if (provider === "telegram") { - const { token } = resolveTelegramToken(loadConfig()); - const result = await deps.sendMessageTelegram(opts.to, opts.message, { - token: token || undefined, - mediaUrl: opts.media, + if ( + provider === "telegram" || + provider === "discord" || + provider === "slack" || + provider === "signal" || + provider === "imessage" + ) { + const results = await deliverOutboundPayloads({ + cfg: loadConfig(), + provider, + to: opts.to, + payloads: [{ text: opts.message, mediaUrl: opts.media }], + deps: { + sendWhatsApp: deps.sendMessageWhatsApp, + sendTelegram: deps.sendMessageTelegram, + sendDiscord: deps.sendMessageDiscord, + sendSlack: deps.sendMessageSlack, + sendSignal: deps.sendMessageSignal, + sendIMessage: deps.sendMessageIMessage, + }, }); - runtime.log( - success( - `✅ Sent via telegram. Message ID: ${result.messageId} (chat ${result.chatId})`, - ), - ); + const last = results.at(-1); + const summary = formatDirectSendSummary(provider, last); + runtime.log(success(summary)); if (opts.json) { runtime.log( JSON.stringify( { - provider: "telegram", + provider, via: "direct", to: opts.to, - chatId: result.chatId, - messageId: result.messageId, - mediaUrl: opts.media ?? null, - }, - null, - 2, - ), - ); - } - return; - } - - if (provider === "discord") { - const result = await deps.sendMessageDiscord(opts.to, opts.message, { - token: process.env.DISCORD_BOT_TOKEN, - mediaUrl: opts.media, - }); - runtime.log( - success( - `✅ Sent via discord. Message ID: ${result.messageId} (channel ${result.channelId})`, - ), - ); - if (opts.json) { - runtime.log( - JSON.stringify( - { - provider: "discord", - via: "direct", - to: opts.to, - channelId: result.channelId, - messageId: result.messageId, - mediaUrl: opts.media ?? null, - }, - null, - 2, - ), - ); - } - return; - } - - if (provider === "slack") { - const result = await deps.sendMessageSlack(opts.to, opts.message, { - mediaUrl: opts.media, - }); - runtime.log( - success( - `✅ Sent via slack. Message ID: ${result.messageId} (channel ${result.channelId})`, - ), - ); - if (opts.json) { - runtime.log( - JSON.stringify( - { - provider: "slack", - via: "direct", - to: opts.to, - channelId: result.channelId, - messageId: result.messageId, - mediaUrl: opts.media ?? null, - }, - null, - 2, - ), - ); - } - return; - } - - if (provider === "signal") { - const result = await deps.sendMessageSignal(opts.to, opts.message, { - mediaUrl: opts.media, - }); - runtime.log(success(`✅ Sent via signal. Message ID: ${result.messageId}`)); - if (opts.json) { - runtime.log( - JSON.stringify( - { - provider: "signal", - via: "direct", - to: opts.to, - messageId: result.messageId, - mediaUrl: opts.media ?? null, - }, - null, - 2, - ), - ); - } - return; - } - - if (provider === "imessage" || provider === "imsg") { - const result = await deps.sendMessageIMessage(opts.to, opts.message, { - mediaUrl: opts.media, - }); - runtime.log( - success(`✅ Sent via iMessage. Message ID: ${result.messageId}`), - ); - if (opts.json) { - runtime.log( - JSON.stringify( - { - provider: "imessage", - via: "direct", - to: opts.to, - messageId: result.messageId, + messageId: last?.messageId ?? "unknown", + ...(last && "chatId" in last ? { chatId: last.chatId } : {}), + ...(last && "channelId" in last ? { channelId: last.channelId } : {}), + ...(last && "timestamp" in last ? { timestamp: last.timestamp } : {}), mediaUrl: opts.media ?? null, }, null, @@ -206,3 +118,28 @@ export async function sendCommand( ); } } + +function formatDirectSendSummary( + provider: string, + result: OutboundDeliveryResult | undefined, +): string { + if (!result) { + return `✅ Sent via ${provider}. Message ID: unknown`; + } + if (result.provider === "telegram") { + return `✅ Sent via telegram. Message ID: ${result.messageId} (chat ${result.chatId})`; + } + if (result.provider === "discord") { + return `✅ Sent via discord. Message ID: ${result.messageId} (channel ${result.channelId})`; + } + if (result.provider === "slack") { + return `✅ Sent via slack. Message ID: ${result.messageId} (channel ${result.channelId})`; + } + if (result.provider === "signal") { + return `✅ Sent via signal. Message ID: ${result.messageId}`; + } + if (result.provider === "imessage") { + return `✅ Sent via iMessage. Message ID: ${result.messageId}`; + } + return `✅ Sent via ${provider}. Message ID: ${result.messageId}`; +} diff --git a/src/infra/heartbeat-runner.test.ts b/src/infra/heartbeat-runner.test.ts index 1a74d49aa..087249bba 100644 --- a/src/infra/heartbeat-runner.test.ts +++ b/src/infra/heartbeat-runner.test.ts @@ -6,11 +6,11 @@ import { HEARTBEAT_PROMPT } from "../auto-reply/heartbeat.js"; import * as replyModule from "../auto-reply/reply.js"; import type { ClawdbotConfig } from "../config/config.js"; import { - resolveHeartbeatDeliveryTarget, resolveHeartbeatIntervalMs, resolveHeartbeatPrompt, runHeartbeatOnce, } from "./heartbeat-runner.js"; +import { resolveHeartbeatDeliveryTarget } from "./outbound/targets.js"; describe("resolveHeartbeatIntervalMs", () => { it("returns default when unset", () => { @@ -296,4 +296,67 @@ describe("runHeartbeatOnce", () => { await fs.rm(tmpDir, { recursive: true, force: true }); } }); + + it("passes telegram token from config to sendTelegram", async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-")); + const storePath = path.join(tmpDir, "sessions.json"); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; + process.env.TELEGRAM_BOT_TOKEN = ""; + try { + await fs.writeFile( + storePath, + JSON.stringify( + { + main: { + sessionId: "sid", + updatedAt: Date.now(), + lastProvider: "telegram", + lastTo: "123456", + }, + }, + null, + 2, + ), + ); + + const cfg: ClawdbotConfig = { + agent: { + heartbeat: { every: "5m", target: "telegram", to: "123456" }, + }, + telegram: { botToken: "test-bot-token-123" }, + session: { store: storePath }, + }; + + replySpy.mockResolvedValue({ text: "Hello from heartbeat" }); + const sendTelegram = vi.fn().mockResolvedValue({ + messageId: "m1", + chatId: "123456", + }); + + await runHeartbeatOnce({ + cfg, + deps: { + sendTelegram, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(sendTelegram).toHaveBeenCalledTimes(1); + expect(sendTelegram).toHaveBeenCalledWith( + "123456", + "Hello from heartbeat", + expect.objectContaining({ token: "test-bot-token-123" }), + ); + } finally { + replySpy.mockRestore(); + if (prevTelegramToken === undefined) { + delete process.env.TELEGRAM_BOT_TOKEN; + } else { + process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken; + } + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 1fa7065bc..88dd33b2e 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -1,4 +1,3 @@ -import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { DEFAULT_HEARTBEAT_ACK_MAX_CHARS, DEFAULT_HEARTBEAT_EVERY, @@ -16,57 +15,24 @@ import { type SessionEntry, saveSessionStore, } from "../config/sessions.js"; -import { sendMessageDiscord } from "../discord/send.js"; -import { sendMessageIMessage } from "../imessage/send.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging.js"; import { getQueueSize } from "../process/command-queue.js"; import { webAuthExists } from "../providers/web/index.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; -import { sendMessageSignal } from "../signal/send.js"; -import { sendMessageSlack } from "../slack/send.js"; -import { sendMessageTelegram } from "../telegram/send.js"; -import { normalizeE164 } from "../utils.js"; import { getActiveWebListener } from "../web/active-listener.js"; -import { sendMessageWhatsApp } from "../web/outbound.js"; import { emitHeartbeatEvent } from "./heartbeat-events.js"; import { type HeartbeatRunResult, requestHeartbeatNow, setHeartbeatWakeHandler, } from "./heartbeat-wake.js"; +import { deliverOutboundPayloads } from "./outbound/deliver.js"; +import type { OutboundSendDeps } from "./outbound/deliver.js"; +import { resolveHeartbeatDeliveryTarget } from "./outbound/targets.js"; -export type HeartbeatTarget = - | "last" - | "whatsapp" - | "telegram" - | "discord" - | "slack" - | "signal" - | "imessage" - | "none"; - -export type HeartbeatDeliveryTarget = { - provider: - | "whatsapp" - | "telegram" - | "discord" - | "slack" - | "signal" - | "imessage" - | "none"; - to?: string; - reason?: string; -}; - -type HeartbeatDeps = { +type HeartbeatDeps = OutboundSendDeps & { runtime?: RuntimeEnv; - sendWhatsApp?: typeof sendMessageWhatsApp; - sendTelegram?: typeof sendMessageTelegram; - sendDiscord?: typeof sendMessageDiscord; - sendSlack?: typeof sendMessageSlack; - sendSignal?: typeof sendMessageSignal; - sendIMessage?: typeof sendMessageIMessage; getQueueSize?: (lane?: string) => number; nowMs?: () => number; webAuthExists?: () => Promise; @@ -191,83 +157,6 @@ async function resolveWhatsAppReadiness( return { ok: true, reason: "ok" }; } -export function resolveHeartbeatDeliveryTarget(params: { - cfg: ClawdbotConfig; - entry?: SessionEntry; -}): HeartbeatDeliveryTarget { - const { cfg, entry } = params; - const rawTarget = cfg.agent?.heartbeat?.target; - const target: HeartbeatTarget = - rawTarget === "whatsapp" || - rawTarget === "telegram" || - rawTarget === "discord" || - rawTarget === "slack" || - rawTarget === "signal" || - rawTarget === "imessage" || - rawTarget === "none" || - rawTarget === "last" - ? rawTarget - : "last"; - if (target === "none") { - return { provider: "none", reason: "target-none" }; - } - - const explicitTo = - typeof cfg.agent?.heartbeat?.to === "string" && - cfg.agent.heartbeat.to.trim() - ? cfg.agent.heartbeat.to.trim() - : undefined; - - const lastProvider = - entry?.lastProvider && entry.lastProvider !== "webchat" - ? entry.lastProvider - : undefined; - const lastTo = typeof entry?.lastTo === "string" ? entry.lastTo.trim() : ""; - - const provider: - | "whatsapp" - | "telegram" - | "discord" - | "slack" - | "signal" - | "imessage" - | undefined = - target === "last" - ? lastProvider - : target === "whatsapp" || - target === "telegram" || - target === "discord" || - target === "slack" || - target === "signal" || - target === "imessage" - ? target - : undefined; - - const to = - explicitTo || - (provider && lastProvider === provider ? lastTo : undefined) || - (target === "last" ? lastTo : undefined); - - if (!provider || !to) { - return { provider: "none", reason: "no-target" }; - } - - if (provider !== "whatsapp") { - return { provider, to }; - } - - const rawAllow = cfg.whatsapp?.allowFrom ?? []; - if (rawAllow.includes("*")) return { provider, to }; - const allowFrom = rawAllow - .map((val) => normalizeE164(val)) - .filter((val) => val.length > 1); - if (allowFrom.length === 0) return { provider, to }; - - const normalized = normalizeE164(to); - if (allowFrom.includes(normalized)) return { provider, to: normalized }; - return { provider, to: allowFrom[0], reason: "allowFrom-fallback" }; -} - async function restoreHeartbeatUpdatedAt(params: { storePath: string; sessionKey: string; @@ -309,123 +198,6 @@ function normalizeHeartbeatReply( return { shouldSkip: false, text: finalText, hasMedia }; } -async function deliverHeartbeatReply(params: { - provider: - | "whatsapp" - | "telegram" - | "discord" - | "slack" - | "signal" - | "imessage"; - to: string; - text: string; - mediaUrls: string[]; - textLimit: number; - deps: Required< - Pick< - HeartbeatDeps, - | "sendWhatsApp" - | "sendTelegram" - | "sendDiscord" - | "sendSlack" - | "sendSignal" - | "sendIMessage" - > - >; -}) { - const { provider, to, text, mediaUrls, deps, textLimit } = params; - if (provider === "whatsapp") { - if (mediaUrls.length === 0) { - for (const chunk of chunkText(text, textLimit)) { - await deps.sendWhatsApp(to, chunk, { verbose: false }); - } - return; - } - let first = true; - for (const url of mediaUrls) { - const caption = first ? text : ""; - first = false; - await deps.sendWhatsApp(to, caption, { verbose: false, mediaUrl: url }); - } - return; - } - - if (provider === "signal") { - if (mediaUrls.length === 0) { - for (const chunk of chunkText(text, textLimit)) { - await deps.sendSignal(to, chunk); - } - return; - } - let first = true; - for (const url of mediaUrls) { - const caption = first ? text : ""; - first = false; - await deps.sendSignal(to, caption, { mediaUrl: url }); - } - return; - } - - if (provider === "imessage") { - if (mediaUrls.length === 0) { - for (const chunk of chunkText(text, textLimit)) { - await deps.sendIMessage(to, chunk); - } - return; - } - let first = true; - for (const url of mediaUrls) { - const caption = first ? text : ""; - first = false; - await deps.sendIMessage(to, caption, { mediaUrl: url }); - } - return; - } - - if (provider === "telegram") { - if (mediaUrls.length === 0) { - for (const chunk of chunkText(text, textLimit)) { - await deps.sendTelegram(to, chunk, { verbose: false }); - } - return; - } - let first = true; - for (const url of mediaUrls) { - const caption = first ? text : ""; - first = false; - await deps.sendTelegram(to, caption, { verbose: false, mediaUrl: url }); - } - return; - } - - if (provider === "slack") { - if (mediaUrls.length === 0) { - for (const chunk of chunkText(text, textLimit)) { - await deps.sendSlack(to, chunk); - } - return; - } - let first = true; - for (const url of mediaUrls) { - const caption = first ? text : ""; - first = false; - await deps.sendSlack(to, caption, { mediaUrl: url }); - } - return; - } - // provider is "discord" here - if (mediaUrls.length === 0) { - await deps.sendDiscord(to, text, { verbose: false }); - return; - } - let first = true; - for (const url of mediaUrls) { - const caption = first ? text : ""; - first = false; - await deps.sendDiscord(to, caption, { verbose: false, mediaUrl: url }); - } -} - export async function runHeartbeatOnce(opts: { cfg?: ClawdbotConfig; reason?: string; @@ -541,22 +313,17 @@ export async function runHeartbeatOnce(opts: { } } - const deps = { - sendWhatsApp: opts.deps?.sendWhatsApp ?? sendMessageWhatsApp, - sendTelegram: opts.deps?.sendTelegram ?? sendMessageTelegram, - sendDiscord: opts.deps?.sendDiscord ?? sendMessageDiscord, - sendSlack: opts.deps?.sendSlack ?? sendMessageSlack, - sendSignal: opts.deps?.sendSignal ?? sendMessageSignal, - sendIMessage: opts.deps?.sendIMessage ?? sendMessageIMessage, - }; - const textLimit = resolveTextChunkLimit(cfg, delivery.provider); - await deliverHeartbeatReply({ + await deliverOutboundPayloads({ + cfg, provider: delivery.provider, to: delivery.to, - text: normalized.text, - mediaUrls, - textLimit, - deps, + payloads: [ + { + text: normalized.text, + mediaUrls, + }, + ], + deps: opts.deps, }); emitHeartbeatEvent({ diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts new file mode 100644 index 000000000..85b429dc2 --- /dev/null +++ b/src/infra/outbound/deliver.test.ts @@ -0,0 +1,108 @@ +import { describe, expect, it, vi } from "vitest"; + +import type { ClawdbotConfig } from "../../config/config.js"; +import { deliverOutboundPayloads } from "./deliver.js"; + +describe("deliverOutboundPayloads", () => { + it("chunks telegram markdown and passes config token", async () => { + const sendTelegram = vi + .fn() + .mockResolvedValue({ messageId: "m1", chatId: "c1" }); + const cfg: ClawdbotConfig = { + telegram: { botToken: "tok-1", textChunkLimit: 2 }, + }; + const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; + process.env.TELEGRAM_BOT_TOKEN = ""; + try { + const results = await deliverOutboundPayloads({ + cfg, + provider: "telegram", + to: "123", + payloads: [{ text: "abcd" }], + deps: { sendTelegram }, + }); + + expect(sendTelegram).toHaveBeenCalledTimes(2); + for (const call of sendTelegram.mock.calls) { + expect(call[2]).toEqual( + expect.objectContaining({ token: "tok-1", verbose: false }), + ); + } + expect(results).toHaveLength(2); + expect(results[0]).toMatchObject({ provider: "telegram", chatId: "c1" }); + } finally { + if (prevTelegramToken === undefined) { + delete process.env.TELEGRAM_BOT_TOKEN; + } else { + process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken; + } + } + }); + + it("uses signal media maxBytes from config", async () => { + const sendSignal = vi + .fn() + .mockResolvedValue({ messageId: "s1", timestamp: 123 }); + const cfg: ClawdbotConfig = { signal: { mediaMaxMb: 2 } }; + + const results = await deliverOutboundPayloads({ + cfg, + provider: "signal", + to: "+1555", + payloads: [{ text: "hi", mediaUrl: "https://x.test/a.jpg" }], + deps: { sendSignal }, + }); + + expect(sendSignal).toHaveBeenCalledWith( + "+1555", + "hi", + expect.objectContaining({ + mediaUrl: "https://x.test/a.jpg", + maxBytes: 2 * 1024 * 1024, + }), + ); + expect(results[0]).toMatchObject({ provider: "signal", messageId: "s1" }); + }); + + it("chunks WhatsApp text and returns all results", async () => { + const sendWhatsApp = vi + .fn() + .mockResolvedValueOnce({ messageId: "w1", toJid: "jid" }) + .mockResolvedValueOnce({ messageId: "w2", toJid: "jid" }); + const cfg: ClawdbotConfig = { + whatsapp: { textChunkLimit: 2 }, + }; + + const results = await deliverOutboundPayloads({ + cfg, + provider: "whatsapp", + to: "+1555", + payloads: [{ text: "abcd" }], + deps: { sendWhatsApp }, + }); + + expect(sendWhatsApp).toHaveBeenCalledTimes(2); + expect(results.map((r) => r.messageId)).toEqual(["w1", "w2"]); + }); + + it("uses iMessage media maxBytes from agent fallback", async () => { + const sendIMessage = vi + .fn() + .mockResolvedValue({ messageId: "i1" }); + const cfg: ClawdbotConfig = { agent: { mediaMaxMb: 3 } }; + + await deliverOutboundPayloads({ + cfg, + provider: "imessage", + to: "chat_id:42", + payloads: [{ text: "hello" }], + deps: { sendIMessage }, + }); + + expect(sendIMessage).toHaveBeenCalledWith( + "chat_id:42", + "hello", + expect.objectContaining({ maxBytes: 3 * 1024 * 1024 }), + ); + }); +}); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts new file mode 100644 index 000000000..60195fd3a --- /dev/null +++ b/src/infra/outbound/deliver.ts @@ -0,0 +1,200 @@ +import { + chunkMarkdownText, + chunkText, + resolveTextChunkLimit, +} from "../../auto-reply/chunk.js"; +import type { ReplyPayload } from "../../auto-reply/types.js"; +import type { ClawdbotConfig } from "../../config/config.js"; +import { sendMessageDiscord } from "../../discord/send.js"; +import { sendMessageIMessage } from "../../imessage/send.js"; +import { sendMessageSignal } from "../../signal/send.js"; +import { sendMessageSlack } from "../../slack/send.js"; +import { sendMessageTelegram } from "../../telegram/send.js"; +import { resolveTelegramToken } from "../../telegram/token.js"; +import { sendMessageWhatsApp } from "../../web/outbound.js"; +import type { OutboundProvider } from "./targets.js"; + +const MB = 1024 * 1024; + +export type OutboundSendDeps = { + sendWhatsApp?: typeof sendMessageWhatsApp; + sendTelegram?: typeof sendMessageTelegram; + sendDiscord?: typeof sendMessageDiscord; + sendSlack?: typeof sendMessageSlack; + sendSignal?: typeof sendMessageSignal; + sendIMessage?: typeof sendMessageIMessage; +}; + +export type OutboundDeliveryResult = + | { provider: "whatsapp"; messageId: string; toJid: string } + | { provider: "telegram"; messageId: string; chatId: string } + | { provider: "discord"; messageId: string; channelId: string } + | { provider: "slack"; messageId: string; channelId: string } + | { provider: "signal"; messageId: string; timestamp?: number } + | { provider: "imessage"; messageId: string }; + +type Chunker = (text: string, limit: number) => string[]; + +function resolveChunker(provider: OutboundProvider): Chunker | null { + if (provider === "telegram") return chunkMarkdownText; + if (provider === "whatsapp") return chunkText; + if (provider === "signal") return chunkText; + if (provider === "imessage") return chunkText; + return null; +} + +function resolveSignalMaxBytes(cfg: ClawdbotConfig): number | undefined { + if (cfg.signal?.mediaMaxMb) return cfg.signal.mediaMaxMb * MB; + if (cfg.agent?.mediaMaxMb) return cfg.agent.mediaMaxMb * MB; + return undefined; +} + +function resolveIMessageMaxBytes(cfg: ClawdbotConfig): number | undefined { + if (cfg.imessage?.mediaMaxMb) return cfg.imessage.mediaMaxMb * MB; + if (cfg.agent?.mediaMaxMb) return cfg.agent.mediaMaxMb * MB; + return undefined; +} + +function normalizeMediaUrls(payload: ReplyPayload): string[] { + return payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); +} + +export async function deliverOutboundPayloads(params: { + cfg: ClawdbotConfig; + provider: Exclude; + to: string; + payloads: ReplyPayload[]; + deps?: OutboundSendDeps; +}): Promise { + const { cfg, provider, to, payloads } = params; + const deps = { + sendWhatsApp: params.deps?.sendWhatsApp ?? sendMessageWhatsApp, + sendTelegram: params.deps?.sendTelegram ?? sendMessageTelegram, + sendDiscord: params.deps?.sendDiscord ?? sendMessageDiscord, + sendSlack: params.deps?.sendSlack ?? sendMessageSlack, + sendSignal: params.deps?.sendSignal ?? sendMessageSignal, + sendIMessage: params.deps?.sendIMessage ?? sendMessageIMessage, + }; + const results: OutboundDeliveryResult[] = []; + + const chunker = resolveChunker(provider); + const textLimit = chunker ? resolveTextChunkLimit(cfg, provider) : undefined; + const telegramToken = + provider === "telegram" + ? resolveTelegramToken(cfg).token || undefined + : undefined; + const signalMaxBytes = + provider === "signal" ? resolveSignalMaxBytes(cfg) : undefined; + const imessageMaxBytes = + provider === "imessage" ? resolveIMessageMaxBytes(cfg) : undefined; + + const sendTextChunks = async (text: string) => { + if (!chunker || textLimit === undefined) { + await sendText(text); + return; + } + for (const chunk of chunker(text, textLimit)) { + await sendText(chunk); + } + }; + + const sendText = async (text: string) => { + if (provider === "whatsapp") { + const res = await deps.sendWhatsApp(to, text, { verbose: false }); + results.push({ provider: "whatsapp", ...res }); + return; + } + if (provider === "telegram") { + const res = await deps.sendTelegram(to, text, { + verbose: false, + token: telegramToken, + }); + results.push({ provider: "telegram", ...res }); + return; + } + if (provider === "signal") { + const res = await deps.sendSignal(to, text, { maxBytes: signalMaxBytes }); + results.push({ provider: "signal", ...res }); + return; + } + if (provider === "imessage") { + const res = await deps.sendIMessage(to, text, { + maxBytes: imessageMaxBytes, + }); + results.push({ provider: "imessage", ...res }); + return; + } + if (provider === "slack") { + const res = await deps.sendSlack(to, text); + results.push({ provider: "slack", ...res }); + return; + } + const res = await deps.sendDiscord(to, text, { verbose: false }); + results.push({ provider: "discord", ...res }); + }; + + const sendMedia = async (caption: string, mediaUrl: string) => { + if (provider === "whatsapp") { + const res = await deps.sendWhatsApp(to, caption, { + verbose: false, + mediaUrl, + }); + results.push({ provider: "whatsapp", ...res }); + return; + } + if (provider === "telegram") { + const res = await deps.sendTelegram(to, caption, { + verbose: false, + mediaUrl, + token: telegramToken, + }); + results.push({ provider: "telegram", ...res }); + return; + } + if (provider === "signal") { + const res = await deps.sendSignal(to, caption, { + mediaUrl, + maxBytes: signalMaxBytes, + }); + results.push({ provider: "signal", ...res }); + return; + } + if (provider === "imessage") { + const res = await deps.sendIMessage(to, caption, { + mediaUrl, + maxBytes: imessageMaxBytes, + }); + results.push({ provider: "imessage", ...res }); + return; + } + if (provider === "slack") { + const res = await deps.sendSlack(to, caption, { mediaUrl }); + results.push({ provider: "slack", ...res }); + return; + } + const res = await deps.sendDiscord(to, caption, { + verbose: false, + mediaUrl, + }); + results.push({ provider: "discord", ...res }); + }; + + for (const payload of payloads) { + const text = payload.text ?? ""; + const mediaUrls = normalizeMediaUrls(payload); + if (!text && mediaUrls.length === 0) continue; + + if (mediaUrls.length === 0) { + await sendTextChunks(text); + continue; + } + + let first = true; + for (const url of mediaUrls) { + const caption = first ? text : ""; + first = false; + await sendMedia(caption, url); + } + } + return results; +} diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts new file mode 100644 index 000000000..a551a7dc9 --- /dev/null +++ b/src/infra/outbound/targets.ts @@ -0,0 +1,97 @@ +import type { ClawdbotConfig } from "../../config/config.js"; +import type { SessionEntry } from "../../config/sessions.js"; +import { normalizeE164 } from "../../utils.js"; + +export type OutboundProvider = + | "whatsapp" + | "telegram" + | "discord" + | "slack" + | "signal" + | "imessage" + | "none"; + +export type HeartbeatTarget = OutboundProvider | "last"; + +export type OutboundTarget = { + provider: OutboundProvider; + to?: string; + reason?: string; +}; + +export function resolveHeartbeatDeliveryTarget(params: { + cfg: ClawdbotConfig; + entry?: SessionEntry; +}): OutboundTarget { + const { cfg, entry } = params; + const rawTarget = cfg.agent?.heartbeat?.target; + const target: HeartbeatTarget = + rawTarget === "whatsapp" || + rawTarget === "telegram" || + rawTarget === "discord" || + rawTarget === "slack" || + rawTarget === "signal" || + rawTarget === "imessage" || + rawTarget === "none" || + rawTarget === "last" + ? rawTarget + : "last"; + if (target === "none") { + return { provider: "none", reason: "target-none" }; + } + + const explicitTo = + typeof cfg.agent?.heartbeat?.to === "string" && + cfg.agent.heartbeat.to.trim() + ? cfg.agent.heartbeat.to.trim() + : undefined; + + const lastProvider = + entry?.lastProvider && entry.lastProvider !== "webchat" + ? entry.lastProvider + : undefined; + const lastTo = typeof entry?.lastTo === "string" ? entry.lastTo.trim() : ""; + + const provider: + | "whatsapp" + | "telegram" + | "discord" + | "slack" + | "signal" + | "imessage" + | undefined = + target === "last" + ? lastProvider + : target === "whatsapp" || + target === "telegram" || + target === "discord" || + target === "slack" || + target === "signal" || + target === "imessage" + ? target + : undefined; + + const to = + explicitTo || + (provider && lastProvider === provider ? lastTo : undefined) || + (target === "last" ? lastTo : undefined); + + if (!provider || !to) { + return { provider: "none", reason: "no-target" }; + } + + if (provider !== "whatsapp") { + return { provider, to }; + } + + const rawAllow = cfg.whatsapp?.allowFrom ?? []; + if (rawAllow.includes("*")) return { provider, to }; + const allowFrom = rawAllow + .map((val) => normalizeE164(val)) + .filter((val) => val.length > 1); + if (allowFrom.length === 0) return { provider, to }; + + const normalized = normalizeE164(to); + if (allowFrom.includes(normalized)) return { provider, to: normalized }; + return { provider, to: allowFrom[0], reason: "allowFrom-fallback" }; +}