From 4bf5f37a44f6a01537d9eb74bffa15d69c18a0b5 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 7 Jan 2026 02:30:42 +0000 Subject: [PATCH] refactor: streamline outbound payload handling --- src/agents/sandbox.ts | 4 +- src/commands/agent.ts | 26 +-- src/commands/send.test.ts | 2 +- src/commands/send.ts | 8 +- src/infra/outbound/deliver.ts | 268 +++++++++++++++------------- src/infra/outbound/payloads.test.ts | 50 ++++++ src/infra/outbound/payloads.ts | 44 +++++ src/telegram/bot.ts | 2 +- 8 files changed, 254 insertions(+), 150 deletions(-) create mode 100644 src/infra/outbound/payloads.test.ts create mode 100644 src/infra/outbound/payloads.ts diff --git a/src/agents/sandbox.ts b/src/agents/sandbox.ts index e45882da5..bf2b41056 100644 --- a/src/agents/sandbox.ts +++ b/src/agents/sandbox.ts @@ -681,7 +681,9 @@ async function ensureSandboxBrowser(params: { if (!isToolAllowed(params.cfg.tools, "browser")) return null; const slug = - params.cfg.scope === "shared" ? "shared" : slugifySessionKey(params.scopeKey); + params.cfg.scope === "shared" + ? "shared" + : slugifySessionKey(params.scopeKey); const name = `${params.cfg.browser.containerPrefix}${slug}`; const containerName = name.slice(0, 63); const state = await dockerContainerState(containerName); diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 61d96efb3..d0ee57004 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -44,10 +44,13 @@ import { emitAgentEvent, registerAgentRunContext, } from "../infra/agent-events.js"; +import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { - deliverOutboundPayloads, + formatOutboundPayloadLog, + type NormalizedOutboundPayload, normalizeOutboundPayloads, -} from "../infra/outbound/deliver.js"; + normalizeOutboundPayloadsForJson, +} from "../infra/outbound/payloads.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { resolveSendPolicy } from "../sessions/send-policy.js"; @@ -541,12 +544,8 @@ export async function agentCommand( } } + const normalizedPayloads = normalizeOutboundPayloadsForJson(payloads); if (opts.json) { - const normalizedPayloads = payloads.map((p) => ({ - text: p.text ?? "", - mediaUrl: p.mediaUrl ?? null, - mediaUrls: p.mediaUrls ?? (p.mediaUrl ? [p.mediaUrl] : undefined), - })); runtime.log( JSON.stringify( { payloads: normalizedPayloads, meta: result.meta }, @@ -565,12 +564,10 @@ export async function agentCommand( } const deliveryPayloads = normalizeOutboundPayloads(payloads); - const logPayload = (payload: { text: string; mediaUrls: string[] }) => { + const logPayload = (payload: NormalizedOutboundPayload) => { if (opts.json) return; - const lines: string[] = []; - if (payload.text) lines.push(payload.text.trimEnd()); - for (const url of payload.mediaUrls) lines.push(`MEDIA:${url}`); - runtime.log(lines.join("\n")); + const output = formatOutboundPayloadLog(payload); + if (output) runtime.log(output); }; if (!deliver) { for (const payload of deliveryPayloads) { @@ -607,10 +604,5 @@ export async function agentCommand( } } - const normalizedPayloads = payloads.map((p) => ({ - text: p.text ?? "", - mediaUrl: p.mediaUrl ?? null, - mediaUrls: p.mediaUrls ?? (p.mediaUrl ? [p.mediaUrl] : undefined), - })); return { payloads: normalizedPayloads, meta: result.meta }; } diff --git a/src/commands/send.test.ts b/src/commands/send.test.ts index 698fddd6b..c741a12e2 100644 --- a/src/commands/send.test.ts +++ b/src/commands/send.test.ts @@ -243,7 +243,7 @@ describe("sendCommand", () => { runtime, ); expect(runtime.log).toHaveBeenCalledWith( - expect.stringContaining('"provider": "web"'), + expect.stringContaining('"provider": "whatsapp"'), ); }); }); diff --git a/src/commands/send.ts b/src/commands/send.ts index b7543df0a..bc7a6ceb0 100644 --- a/src/commands/send.ts +++ b/src/commands/send.ts @@ -114,13 +114,13 @@ export async function sendCommand( if (opts.json) { runtime.log( JSON.stringify( - { - provider: "web", + buildOutboundDeliveryJson({ + provider, via: "gateway", to: opts.to, - messageId: result.messageId, + result, mediaUrl: opts.media ?? null, - }, + }), null, 2, ), diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 087a66993..57690f471 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -12,10 +12,15 @@ import { sendMessageSlack } from "../../slack/send.js"; import { sendMessageTelegram } from "../../telegram/send.js"; import { resolveTelegramToken } from "../../telegram/token.js"; import { sendMessageWhatsApp } from "../../web/outbound.js"; +import type { NormalizedOutboundPayload } from "./payloads.js"; +import { normalizeOutboundPayloads } from "./payloads.js"; import type { OutboundProvider } from "./targets.js"; const MB = 1024 * 1024; +export type { NormalizedOutboundPayload } from "./payloads.js"; +export { normalizeOutboundPayloads } from "./payloads.js"; + export type OutboundSendDeps = { sendWhatsApp?: typeof sendMessageWhatsApp; sendTelegram?: typeof sendMessageTelegram; @@ -33,43 +38,137 @@ export type OutboundDeliveryResult = | { provider: "signal"; messageId: string; timestamp?: number } | { provider: "imessage"; messageId: string }; -export type NormalizedOutboundPayload = { - text: string; - mediaUrls: string[]; -}; - type Chunker = (text: string, limit: number) => string[]; -function resolveChunker(provider: OutboundProvider): Chunker | null { - if (provider === "telegram") return chunkMarkdownText; - if (provider === "whatsapp") return chunkText; - if (provider === "signal") return chunkText; - if (provider === "imessage") return chunkText; - return null; -} +type ProviderHandler = { + chunker: Chunker | null; + sendText: (text: string) => Promise; + sendMedia: ( + caption: string, + mediaUrl: string, + ) => Promise; +}; -function resolveSignalMaxBytes(cfg: ClawdbotConfig): number | undefined { - if (cfg.signal?.mediaMaxMb) return cfg.signal.mediaMaxMb * MB; +function resolveMediaMaxBytes( + cfg: ClawdbotConfig, + provider: "signal" | "imessage", +): number | undefined { + const providerLimit = + provider === "signal" ? cfg.signal?.mediaMaxMb : cfg.imessage?.mediaMaxMb; + if (providerLimit) return providerLimit * MB; if (cfg.agent?.mediaMaxMb) return cfg.agent.mediaMaxMb * MB; return undefined; } -function resolveIMessageMaxBytes(cfg: ClawdbotConfig): number | undefined { - if (cfg.imessage?.mediaMaxMb) return cfg.imessage.mediaMaxMb * MB; - if (cfg.agent?.mediaMaxMb) return cfg.agent.mediaMaxMb * MB; - return undefined; -} +function createProviderHandler(params: { + cfg: ClawdbotConfig; + provider: Exclude; + to: string; + deps: Required; +}): ProviderHandler { + const { cfg, to, deps } = params; + const telegramToken = + params.provider === "telegram" + ? resolveTelegramToken(cfg).token || undefined + : undefined; + const signalMaxBytes = + params.provider === "signal" + ? resolveMediaMaxBytes(cfg, "signal") + : undefined; + const imessageMaxBytes = + params.provider === "imessage" + ? resolveMediaMaxBytes(cfg, "imessage") + : undefined; -export function normalizeOutboundPayloads( - payloads: ReplyPayload[], -): NormalizedOutboundPayload[] { - return payloads - .map((payload) => ({ - text: payload.text ?? "", - mediaUrls: - payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []), - })) - .filter((payload) => payload.text || payload.mediaUrls.length > 0); + const handlers: Record, ProviderHandler> = { + whatsapp: { + chunker: chunkText, + sendText: async (text) => ({ + provider: "whatsapp", + ...(await deps.sendWhatsApp(to, text, { verbose: false })), + }), + sendMedia: async (caption, mediaUrl) => ({ + provider: "whatsapp", + ...(await deps.sendWhatsApp(to, caption, { + verbose: false, + mediaUrl, + })), + }), + }, + telegram: { + chunker: chunkMarkdownText, + sendText: async (text) => ({ + provider: "telegram", + ...(await deps.sendTelegram(to, text, { + verbose: false, + token: telegramToken, + })), + }), + sendMedia: async (caption, mediaUrl) => ({ + provider: "telegram", + ...(await deps.sendTelegram(to, caption, { + verbose: false, + mediaUrl, + token: telegramToken, + })), + }), + }, + discord: { + chunker: null, + sendText: async (text) => ({ + provider: "discord", + ...(await deps.sendDiscord(to, text, { verbose: false })), + }), + sendMedia: async (caption, mediaUrl) => ({ + provider: "discord", + ...(await deps.sendDiscord(to, caption, { + verbose: false, + mediaUrl, + })), + }), + }, + slack: { + chunker: null, + sendText: async (text) => ({ + provider: "slack", + ...(await deps.sendSlack(to, text)), + }), + sendMedia: async (caption, mediaUrl) => ({ + provider: "slack", + ...(await deps.sendSlack(to, caption, { mediaUrl })), + }), + }, + signal: { + chunker: chunkText, + sendText: async (text) => ({ + provider: "signal", + ...(await deps.sendSignal(to, text, { maxBytes: signalMaxBytes })), + }), + sendMedia: async (caption, mediaUrl) => ({ + provider: "signal", + ...(await deps.sendSignal(to, caption, { + mediaUrl, + maxBytes: signalMaxBytes, + })), + }), + }, + imessage: { + chunker: chunkText, + sendText: async (text) => ({ + provider: "imessage", + ...(await deps.sendIMessage(to, text, { maxBytes: imessageMaxBytes })), + }), + sendMedia: async (caption, mediaUrl) => ({ + provider: "imessage", + ...(await deps.sendIMessage(to, caption, { + mediaUrl, + maxBytes: imessageMaxBytes, + })), + }), + }, + }; + + return handlers[params.provider]; } export async function deliverOutboundPayloads(params: { @@ -92,109 +191,26 @@ export async function deliverOutboundPayloads(params: { sendIMessage: params.deps?.sendIMessage ?? sendMessageIMessage, }; const results: OutboundDeliveryResult[] = []; - - const chunker = resolveChunker(provider); - const textLimit = chunker ? resolveTextChunkLimit(cfg, provider) : undefined; - const telegramToken = - provider === "telegram" - ? resolveTelegramToken(cfg).token || undefined - : undefined; - const signalMaxBytes = - provider === "signal" ? resolveSignalMaxBytes(cfg) : undefined; - const imessageMaxBytes = - provider === "imessage" ? resolveIMessageMaxBytes(cfg) : undefined; + const handler = createProviderHandler({ + cfg, + provider, + to, + deps, + }); + const textLimit = handler.chunker + ? resolveTextChunkLimit(cfg, provider) + : undefined; const sendTextChunks = async (text: string) => { - if (!chunker || textLimit === undefined) { - await sendText(text); + if (!handler.chunker || textLimit === undefined) { + results.push(await handler.sendText(text)); return; } - for (const chunk of chunker(text, textLimit)) { - await sendText(chunk); + for (const chunk of handler.chunker(text, textLimit)) { + results.push(await handler.sendText(chunk)); } }; - const sendText = async (text: string) => { - if (provider === "whatsapp") { - const res = await deps.sendWhatsApp(to, text, { verbose: false }); - results.push({ provider: "whatsapp", ...res }); - return; - } - if (provider === "telegram") { - const res = await deps.sendTelegram(to, text, { - verbose: false, - token: telegramToken, - }); - results.push({ provider: "telegram", ...res }); - return; - } - if (provider === "signal") { - const res = await deps.sendSignal(to, text, { maxBytes: signalMaxBytes }); - results.push({ provider: "signal", ...res }); - return; - } - if (provider === "imessage") { - const res = await deps.sendIMessage(to, text, { - maxBytes: imessageMaxBytes, - }); - results.push({ provider: "imessage", ...res }); - return; - } - if (provider === "slack") { - const res = await deps.sendSlack(to, text); - results.push({ provider: "slack", ...res }); - return; - } - const res = await deps.sendDiscord(to, text, { verbose: false }); - results.push({ provider: "discord", ...res }); - }; - - const sendMedia = async (caption: string, mediaUrl: string) => { - if (provider === "whatsapp") { - const res = await deps.sendWhatsApp(to, caption, { - verbose: false, - mediaUrl, - }); - results.push({ provider: "whatsapp", ...res }); - return; - } - if (provider === "telegram") { - const res = await deps.sendTelegram(to, caption, { - verbose: false, - mediaUrl, - token: telegramToken, - }); - results.push({ provider: "telegram", ...res }); - return; - } - if (provider === "signal") { - const res = await deps.sendSignal(to, caption, { - mediaUrl, - maxBytes: signalMaxBytes, - }); - results.push({ provider: "signal", ...res }); - return; - } - if (provider === "imessage") { - const res = await deps.sendIMessage(to, caption, { - mediaUrl, - maxBytes: imessageMaxBytes, - }); - results.push({ provider: "imessage", ...res }); - return; - } - if (provider === "slack") { - const res = await deps.sendSlack(to, caption, { mediaUrl }); - results.push({ provider: "slack", ...res }); - return; - } - const res = await deps.sendDiscord(to, caption, { - verbose: false, - mediaUrl, - }); - results.push({ provider: "discord", ...res }); - }; - const normalizedPayloads = normalizeOutboundPayloads(payloads); for (const payload of normalizedPayloads) { try { @@ -208,7 +224,7 @@ export async function deliverOutboundPayloads(params: { for (const url of payload.mediaUrls) { const caption = first ? payload.text : ""; first = false; - await sendMedia(caption, url); + results.push(await handler.sendMedia(caption, url)); } } catch (err) { if (!params.bestEffort) throw err; diff --git a/src/infra/outbound/payloads.test.ts b/src/infra/outbound/payloads.test.ts new file mode 100644 index 000000000..5456b7a81 --- /dev/null +++ b/src/infra/outbound/payloads.test.ts @@ -0,0 +1,50 @@ +import { describe, expect, it } from "vitest"; + +import { + formatOutboundPayloadLog, + normalizeOutboundPayloadsForJson, +} from "./payloads.js"; + +describe("normalizeOutboundPayloadsForJson", () => { + it("normalizes payloads with mediaUrl and mediaUrls", () => { + expect( + normalizeOutboundPayloadsForJson([ + { text: "hi" }, + { text: "photo", mediaUrl: "https://x.test/a.jpg" }, + { text: "multi", mediaUrls: ["https://x.test/1.png"] }, + ]), + ).toEqual([ + { text: "hi", mediaUrl: null, mediaUrls: undefined }, + { + text: "photo", + mediaUrl: "https://x.test/a.jpg", + mediaUrls: ["https://x.test/a.jpg"], + }, + { + text: "multi", + mediaUrl: null, + mediaUrls: ["https://x.test/1.png"], + }, + ]); + }); +}); + +describe("formatOutboundPayloadLog", () => { + it("trims trailing text and appends media lines", () => { + expect( + formatOutboundPayloadLog({ + text: "hello ", + mediaUrls: ["https://x.test/a.png", "https://x.test/b.png"], + }), + ).toBe("hello\nMEDIA:https://x.test/a.png\nMEDIA:https://x.test/b.png"); + }); + + it("logs media-only payloads", () => { + expect( + formatOutboundPayloadLog({ + text: "", + mediaUrls: ["https://x.test/a.png"], + }), + ).toBe("MEDIA:https://x.test/a.png"); + }); +}); diff --git a/src/infra/outbound/payloads.ts b/src/infra/outbound/payloads.ts new file mode 100644 index 000000000..2f67ee583 --- /dev/null +++ b/src/infra/outbound/payloads.ts @@ -0,0 +1,44 @@ +import type { ReplyPayload } from "../../auto-reply/types.js"; + +export type NormalizedOutboundPayload = { + text: string; + mediaUrls: string[]; +}; + +export type OutboundPayloadJson = { + text: string; + mediaUrl: string | null; + mediaUrls?: string[]; +}; + +export function normalizeOutboundPayloads( + payloads: ReplyPayload[], +): NormalizedOutboundPayload[] { + return payloads + .map((payload) => ({ + text: payload.text ?? "", + mediaUrls: + payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []), + })) + .filter((payload) => payload.text || payload.mediaUrls.length > 0); +} + +export function normalizeOutboundPayloadsForJson( + payloads: ReplyPayload[], +): OutboundPayloadJson[] { + return payloads.map((payload) => ({ + text: payload.text ?? "", + mediaUrl: payload.mediaUrl ?? null, + mediaUrls: + payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : undefined), + })); +} + +export function formatOutboundPayloadLog( + payload: NormalizedOutboundPayload, +): string { + const lines: string[] = []; + if (payload.text) lines.push(payload.text.trimEnd()); + for (const url of payload.mediaUrls) lines.push(`MEDIA:${url}`); + return lines.join("\n"); +} diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 50b159ddb..ef6b9b279 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -14,13 +14,13 @@ import { listNativeCommandSpecs, } from "../auto-reply/commands-registry.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; -import { getReplyFromConfig } from "../auto-reply/reply.js"; import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; import { buildMentionRegexes, matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; +import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import type { ReplyToMode } from "../config/config.js"; import { loadConfig } from "../config/config.js";