refactor: streamline outbound payload handling

This commit is contained in:
Peter Steinberger
2026-01-07 02:30:42 +00:00
parent 3fedd0d1d5
commit 4bf5f37a44
8 changed files with 254 additions and 150 deletions

View File

@@ -681,7 +681,9 @@ async function ensureSandboxBrowser(params: {
if (!isToolAllowed(params.cfg.tools, "browser")) return null; if (!isToolAllowed(params.cfg.tools, "browser")) return null;
const slug = const slug =
params.cfg.scope === "shared" ? "shared" : slugifySessionKey(params.scopeKey); params.cfg.scope === "shared"
? "shared"
: slugifySessionKey(params.scopeKey);
const name = `${params.cfg.browser.containerPrefix}${slug}`; const name = `${params.cfg.browser.containerPrefix}${slug}`;
const containerName = name.slice(0, 63); const containerName = name.slice(0, 63);
const state = await dockerContainerState(containerName); const state = await dockerContainerState(containerName);

View File

@@ -44,10 +44,13 @@ import {
emitAgentEvent, emitAgentEvent,
registerAgentRunContext, registerAgentRunContext,
} from "../infra/agent-events.js"; } from "../infra/agent-events.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { import {
deliverOutboundPayloads, formatOutboundPayloadLog,
type NormalizedOutboundPayload,
normalizeOutboundPayloads, normalizeOutboundPayloads,
} from "../infra/outbound/deliver.js"; normalizeOutboundPayloadsForJson,
} from "../infra/outbound/payloads.js";
import { resolveOutboundTarget } from "../infra/outbound/targets.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { resolveSendPolicy } from "../sessions/send-policy.js"; import { resolveSendPolicy } from "../sessions/send-policy.js";
@@ -541,12 +544,8 @@ export async function agentCommand(
} }
} }
const normalizedPayloads = normalizeOutboundPayloadsForJson(payloads);
if (opts.json) { if (opts.json) {
const normalizedPayloads = payloads.map((p) => ({
text: p.text ?? "",
mediaUrl: p.mediaUrl ?? null,
mediaUrls: p.mediaUrls ?? (p.mediaUrl ? [p.mediaUrl] : undefined),
}));
runtime.log( runtime.log(
JSON.stringify( JSON.stringify(
{ payloads: normalizedPayloads, meta: result.meta }, { payloads: normalizedPayloads, meta: result.meta },
@@ -565,12 +564,10 @@ export async function agentCommand(
} }
const deliveryPayloads = normalizeOutboundPayloads(payloads); const deliveryPayloads = normalizeOutboundPayloads(payloads);
const logPayload = (payload: { text: string; mediaUrls: string[] }) => { const logPayload = (payload: NormalizedOutboundPayload) => {
if (opts.json) return; if (opts.json) return;
const lines: string[] = []; const output = formatOutboundPayloadLog(payload);
if (payload.text) lines.push(payload.text.trimEnd()); if (output) runtime.log(output);
for (const url of payload.mediaUrls) lines.push(`MEDIA:${url}`);
runtime.log(lines.join("\n"));
}; };
if (!deliver) { if (!deliver) {
for (const payload of deliveryPayloads) { for (const payload of deliveryPayloads) {
@@ -607,10 +604,5 @@ export async function agentCommand(
} }
} }
const normalizedPayloads = payloads.map((p) => ({
text: p.text ?? "",
mediaUrl: p.mediaUrl ?? null,
mediaUrls: p.mediaUrls ?? (p.mediaUrl ? [p.mediaUrl] : undefined),
}));
return { payloads: normalizedPayloads, meta: result.meta }; return { payloads: normalizedPayloads, meta: result.meta };
} }

View File

@@ -243,7 +243,7 @@ describe("sendCommand", () => {
runtime, runtime,
); );
expect(runtime.log).toHaveBeenCalledWith( expect(runtime.log).toHaveBeenCalledWith(
expect.stringContaining('"provider": "web"'), expect.stringContaining('"provider": "whatsapp"'),
); );
}); });
}); });

View File

@@ -114,13 +114,13 @@ export async function sendCommand(
if (opts.json) { if (opts.json) {
runtime.log( runtime.log(
JSON.stringify( JSON.stringify(
{ buildOutboundDeliveryJson({
provider: "web", provider,
via: "gateway", via: "gateway",
to: opts.to, to: opts.to,
messageId: result.messageId, result,
mediaUrl: opts.media ?? null, mediaUrl: opts.media ?? null,
}, }),
null, null,
2, 2,
), ),

View File

@@ -12,10 +12,15 @@ import { sendMessageSlack } from "../../slack/send.js";
import { sendMessageTelegram } from "../../telegram/send.js"; import { sendMessageTelegram } from "../../telegram/send.js";
import { resolveTelegramToken } from "../../telegram/token.js"; import { resolveTelegramToken } from "../../telegram/token.js";
import { sendMessageWhatsApp } from "../../web/outbound.js"; import { sendMessageWhatsApp } from "../../web/outbound.js";
import type { NormalizedOutboundPayload } from "./payloads.js";
import { normalizeOutboundPayloads } from "./payloads.js";
import type { OutboundProvider } from "./targets.js"; import type { OutboundProvider } from "./targets.js";
const MB = 1024 * 1024; const MB = 1024 * 1024;
export type { NormalizedOutboundPayload } from "./payloads.js";
export { normalizeOutboundPayloads } from "./payloads.js";
export type OutboundSendDeps = { export type OutboundSendDeps = {
sendWhatsApp?: typeof sendMessageWhatsApp; sendWhatsApp?: typeof sendMessageWhatsApp;
sendTelegram?: typeof sendMessageTelegram; sendTelegram?: typeof sendMessageTelegram;
@@ -33,43 +38,137 @@ export type OutboundDeliveryResult =
| { provider: "signal"; messageId: string; timestamp?: number } | { provider: "signal"; messageId: string; timestamp?: number }
| { provider: "imessage"; messageId: string }; | { provider: "imessage"; messageId: string };
export type NormalizedOutboundPayload = {
text: string;
mediaUrls: string[];
};
type Chunker = (text: string, limit: number) => string[]; type Chunker = (text: string, limit: number) => string[];
function resolveChunker(provider: OutboundProvider): Chunker | null { type ProviderHandler = {
if (provider === "telegram") return chunkMarkdownText; chunker: Chunker | null;
if (provider === "whatsapp") return chunkText; sendText: (text: string) => Promise<OutboundDeliveryResult>;
if (provider === "signal") return chunkText; sendMedia: (
if (provider === "imessage") return chunkText; caption: string,
return null; mediaUrl: string,
} ) => Promise<OutboundDeliveryResult>;
};
function resolveSignalMaxBytes(cfg: ClawdbotConfig): number | undefined { function resolveMediaMaxBytes(
if (cfg.signal?.mediaMaxMb) return cfg.signal.mediaMaxMb * MB; cfg: ClawdbotConfig,
provider: "signal" | "imessage",
): number | undefined {
const providerLimit =
provider === "signal" ? cfg.signal?.mediaMaxMb : cfg.imessage?.mediaMaxMb;
if (providerLimit) return providerLimit * MB;
if (cfg.agent?.mediaMaxMb) return cfg.agent.mediaMaxMb * MB; if (cfg.agent?.mediaMaxMb) return cfg.agent.mediaMaxMb * MB;
return undefined; return undefined;
} }
function resolveIMessageMaxBytes(cfg: ClawdbotConfig): number | undefined { function createProviderHandler(params: {
if (cfg.imessage?.mediaMaxMb) return cfg.imessage.mediaMaxMb * MB; cfg: ClawdbotConfig;
if (cfg.agent?.mediaMaxMb) return cfg.agent.mediaMaxMb * MB; provider: Exclude<OutboundProvider, "none">;
return undefined; to: string;
} deps: Required<OutboundSendDeps>;
}): ProviderHandler {
const { cfg, to, deps } = params;
const telegramToken =
params.provider === "telegram"
? resolveTelegramToken(cfg).token || undefined
: undefined;
const signalMaxBytes =
params.provider === "signal"
? resolveMediaMaxBytes(cfg, "signal")
: undefined;
const imessageMaxBytes =
params.provider === "imessage"
? resolveMediaMaxBytes(cfg, "imessage")
: undefined;
export function normalizeOutboundPayloads( const handlers: Record<Exclude<OutboundProvider, "none">, ProviderHandler> = {
payloads: ReplyPayload[], whatsapp: {
): NormalizedOutboundPayload[] { chunker: chunkText,
return payloads sendText: async (text) => ({
.map((payload) => ({ provider: "whatsapp",
text: payload.text ?? "", ...(await deps.sendWhatsApp(to, text, { verbose: false })),
mediaUrls: }),
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []), sendMedia: async (caption, mediaUrl) => ({
})) provider: "whatsapp",
.filter((payload) => payload.text || payload.mediaUrls.length > 0); ...(await deps.sendWhatsApp(to, caption, {
verbose: false,
mediaUrl,
})),
}),
},
telegram: {
chunker: chunkMarkdownText,
sendText: async (text) => ({
provider: "telegram",
...(await deps.sendTelegram(to, text, {
verbose: false,
token: telegramToken,
})),
}),
sendMedia: async (caption, mediaUrl) => ({
provider: "telegram",
...(await deps.sendTelegram(to, caption, {
verbose: false,
mediaUrl,
token: telegramToken,
})),
}),
},
discord: {
chunker: null,
sendText: async (text) => ({
provider: "discord",
...(await deps.sendDiscord(to, text, { verbose: false })),
}),
sendMedia: async (caption, mediaUrl) => ({
provider: "discord",
...(await deps.sendDiscord(to, caption, {
verbose: false,
mediaUrl,
})),
}),
},
slack: {
chunker: null,
sendText: async (text) => ({
provider: "slack",
...(await deps.sendSlack(to, text)),
}),
sendMedia: async (caption, mediaUrl) => ({
provider: "slack",
...(await deps.sendSlack(to, caption, { mediaUrl })),
}),
},
signal: {
chunker: chunkText,
sendText: async (text) => ({
provider: "signal",
...(await deps.sendSignal(to, text, { maxBytes: signalMaxBytes })),
}),
sendMedia: async (caption, mediaUrl) => ({
provider: "signal",
...(await deps.sendSignal(to, caption, {
mediaUrl,
maxBytes: signalMaxBytes,
})),
}),
},
imessage: {
chunker: chunkText,
sendText: async (text) => ({
provider: "imessage",
...(await deps.sendIMessage(to, text, { maxBytes: imessageMaxBytes })),
}),
sendMedia: async (caption, mediaUrl) => ({
provider: "imessage",
...(await deps.sendIMessage(to, caption, {
mediaUrl,
maxBytes: imessageMaxBytes,
})),
}),
},
};
return handlers[params.provider];
} }
export async function deliverOutboundPayloads(params: { export async function deliverOutboundPayloads(params: {
@@ -92,109 +191,26 @@ export async function deliverOutboundPayloads(params: {
sendIMessage: params.deps?.sendIMessage ?? sendMessageIMessage, sendIMessage: params.deps?.sendIMessage ?? sendMessageIMessage,
}; };
const results: OutboundDeliveryResult[] = []; const results: OutboundDeliveryResult[] = [];
const handler = createProviderHandler({
const chunker = resolveChunker(provider); cfg,
const textLimit = chunker ? resolveTextChunkLimit(cfg, provider) : undefined; provider,
const telegramToken = to,
provider === "telegram" deps,
? resolveTelegramToken(cfg).token || undefined });
: undefined; const textLimit = handler.chunker
const signalMaxBytes = ? resolveTextChunkLimit(cfg, provider)
provider === "signal" ? resolveSignalMaxBytes(cfg) : undefined; : undefined;
const imessageMaxBytes =
provider === "imessage" ? resolveIMessageMaxBytes(cfg) : undefined;
const sendTextChunks = async (text: string) => { const sendTextChunks = async (text: string) => {
if (!chunker || textLimit === undefined) { if (!handler.chunker || textLimit === undefined) {
await sendText(text); results.push(await handler.sendText(text));
return; return;
} }
for (const chunk of chunker(text, textLimit)) { for (const chunk of handler.chunker(text, textLimit)) {
await sendText(chunk); results.push(await handler.sendText(chunk));
} }
}; };
const sendText = async (text: string) => {
if (provider === "whatsapp") {
const res = await deps.sendWhatsApp(to, text, { verbose: false });
results.push({ provider: "whatsapp", ...res });
return;
}
if (provider === "telegram") {
const res = await deps.sendTelegram(to, text, {
verbose: false,
token: telegramToken,
});
results.push({ provider: "telegram", ...res });
return;
}
if (provider === "signal") {
const res = await deps.sendSignal(to, text, { maxBytes: signalMaxBytes });
results.push({ provider: "signal", ...res });
return;
}
if (provider === "imessage") {
const res = await deps.sendIMessage(to, text, {
maxBytes: imessageMaxBytes,
});
results.push({ provider: "imessage", ...res });
return;
}
if (provider === "slack") {
const res = await deps.sendSlack(to, text);
results.push({ provider: "slack", ...res });
return;
}
const res = await deps.sendDiscord(to, text, { verbose: false });
results.push({ provider: "discord", ...res });
};
const sendMedia = async (caption: string, mediaUrl: string) => {
if (provider === "whatsapp") {
const res = await deps.sendWhatsApp(to, caption, {
verbose: false,
mediaUrl,
});
results.push({ provider: "whatsapp", ...res });
return;
}
if (provider === "telegram") {
const res = await deps.sendTelegram(to, caption, {
verbose: false,
mediaUrl,
token: telegramToken,
});
results.push({ provider: "telegram", ...res });
return;
}
if (provider === "signal") {
const res = await deps.sendSignal(to, caption, {
mediaUrl,
maxBytes: signalMaxBytes,
});
results.push({ provider: "signal", ...res });
return;
}
if (provider === "imessage") {
const res = await deps.sendIMessage(to, caption, {
mediaUrl,
maxBytes: imessageMaxBytes,
});
results.push({ provider: "imessage", ...res });
return;
}
if (provider === "slack") {
const res = await deps.sendSlack(to, caption, { mediaUrl });
results.push({ provider: "slack", ...res });
return;
}
const res = await deps.sendDiscord(to, caption, {
verbose: false,
mediaUrl,
});
results.push({ provider: "discord", ...res });
};
const normalizedPayloads = normalizeOutboundPayloads(payloads); const normalizedPayloads = normalizeOutboundPayloads(payloads);
for (const payload of normalizedPayloads) { for (const payload of normalizedPayloads) {
try { try {
@@ -208,7 +224,7 @@ export async function deliverOutboundPayloads(params: {
for (const url of payload.mediaUrls) { for (const url of payload.mediaUrls) {
const caption = first ? payload.text : ""; const caption = first ? payload.text : "";
first = false; first = false;
await sendMedia(caption, url); results.push(await handler.sendMedia(caption, url));
} }
} catch (err) { } catch (err) {
if (!params.bestEffort) throw err; if (!params.bestEffort) throw err;

View File

@@ -0,0 +1,50 @@
import { describe, expect, it } from "vitest";
import {
formatOutboundPayloadLog,
normalizeOutboundPayloadsForJson,
} from "./payloads.js";
describe("normalizeOutboundPayloadsForJson", () => {
it("normalizes payloads with mediaUrl and mediaUrls", () => {
expect(
normalizeOutboundPayloadsForJson([
{ text: "hi" },
{ text: "photo", mediaUrl: "https://x.test/a.jpg" },
{ text: "multi", mediaUrls: ["https://x.test/1.png"] },
]),
).toEqual([
{ text: "hi", mediaUrl: null, mediaUrls: undefined },
{
text: "photo",
mediaUrl: "https://x.test/a.jpg",
mediaUrls: ["https://x.test/a.jpg"],
},
{
text: "multi",
mediaUrl: null,
mediaUrls: ["https://x.test/1.png"],
},
]);
});
});
describe("formatOutboundPayloadLog", () => {
it("trims trailing text and appends media lines", () => {
expect(
formatOutboundPayloadLog({
text: "hello ",
mediaUrls: ["https://x.test/a.png", "https://x.test/b.png"],
}),
).toBe("hello\nMEDIA:https://x.test/a.png\nMEDIA:https://x.test/b.png");
});
it("logs media-only payloads", () => {
expect(
formatOutboundPayloadLog({
text: "",
mediaUrls: ["https://x.test/a.png"],
}),
).toBe("MEDIA:https://x.test/a.png");
});
});

View File

@@ -0,0 +1,44 @@
import type { ReplyPayload } from "../../auto-reply/types.js";
export type NormalizedOutboundPayload = {
text: string;
mediaUrls: string[];
};
export type OutboundPayloadJson = {
text: string;
mediaUrl: string | null;
mediaUrls?: string[];
};
export function normalizeOutboundPayloads(
payloads: ReplyPayload[],
): NormalizedOutboundPayload[] {
return payloads
.map((payload) => ({
text: payload.text ?? "",
mediaUrls:
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []),
}))
.filter((payload) => payload.text || payload.mediaUrls.length > 0);
}
export function normalizeOutboundPayloadsForJson(
payloads: ReplyPayload[],
): OutboundPayloadJson[] {
return payloads.map((payload) => ({
text: payload.text ?? "",
mediaUrl: payload.mediaUrl ?? null,
mediaUrls:
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : undefined),
}));
}
export function formatOutboundPayloadLog(
payload: NormalizedOutboundPayload,
): string {
const lines: string[] = [];
if (payload.text) lines.push(payload.text.trimEnd());
for (const url of payload.mediaUrls) lines.push(`MEDIA:${url}`);
return lines.join("\n");
}

View File

@@ -14,13 +14,13 @@ import {
listNativeCommandSpecs, listNativeCommandSpecs,
} from "../auto-reply/commands-registry.js"; } from "../auto-reply/commands-registry.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import { import {
buildMentionRegexes, buildMentionRegexes,
matchesMentionPatterns, matchesMentionPatterns,
} from "../auto-reply/reply/mentions.js"; } from "../auto-reply/reply/mentions.js";
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js"; import type { ReplyPayload } from "../auto-reply/types.js";
import type { ReplyToMode } from "../config/config.js"; import type { ReplyToMode } from "../config/config.js";
import { loadConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js";