refactor: normalize outbound payload delivery

This commit is contained in:
Peter Steinberger
2026-01-07 01:19:47 +00:00
parent f171d509bb
commit aefaed159b
4 changed files with 116 additions and 62 deletions

View File

@@ -1,7 +1,7 @@
# send-refactor scratchpad
- [x] Commit + push current outbound refactor changes
- [ ] Step 1: centralize outbound target validation
- [x] Step 1: centralize outbound target validation
- [ ] Step 2: normalize payloads + single delivery call
- [ ] Step 3: unify outbound JSON/result formatting
- [ ] Cleanup: delete scratchpad, final lint + tests, commit + push

View File

@@ -44,7 +44,10 @@ import {
emitAgentEvent,
registerAgentRunContext,
} from "../infra/agent-events.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import {
deliverOutboundPayloads,
normalizeOutboundPayloads,
} from "../infra/outbound/deliver.js";
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { resolveSendPolicy } from "../sessions/send-policy.js";
@@ -561,51 +564,46 @@ export async function agentCommand(
return { payloads: [], meta: result.meta };
}
for (const payload of payloads) {
const mediaList =
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
if (!opts.json) {
const lines: string[] = [];
if (payload.text) lines.push(payload.text.trimEnd());
for (const url of mediaList) lines.push(`MEDIA:${url}`);
runtime.log(lines.join("\n"));
const deliveryPayloads = normalizeOutboundPayloads(payloads);
const logPayload = (payload: { text: string; mediaUrls: string[] }) => {
if (opts.json) return;
const lines: string[] = [];
if (payload.text) lines.push(payload.text.trimEnd());
for (const url of payload.mediaUrls) lines.push(`MEDIA:${url}`);
runtime.log(lines.join("\n"));
};
if (!deliver) {
for (const payload of deliveryPayloads) {
logPayload(payload);
}
if (!deliver) continue;
const text = payload.text ?? "";
const media = mediaList;
if (!text && media.length === 0) continue;
if (
deliveryProvider === "whatsapp" ||
}
if (
deliver &&
(deliveryProvider === "whatsapp" ||
deliveryProvider === "telegram" ||
deliveryProvider === "discord" ||
deliveryProvider === "slack" ||
deliveryProvider === "signal" ||
deliveryProvider === "imessage"
) {
if (!deliveryTarget) continue;
try {
await deliverOutboundPayloads({
cfg,
provider: deliveryProvider,
to: deliveryTarget,
payloads: [payload],
deps: {
sendWhatsApp: deps.sendMessageWhatsApp,
sendTelegram: deps.sendMessageTelegram,
sendDiscord: deps.sendMessageDiscord,
sendSlack: deps.sendMessageSlack,
sendSignal: deps.sendMessageSignal,
sendIMessage: deps.sendMessageIMessage,
},
});
} catch (err) {
if (!bestEffortDeliver) throw err;
logDeliveryError(err);
}
deliveryProvider === "imessage")
) {
if (deliveryTarget) {
await deliverOutboundPayloads({
cfg,
provider: deliveryProvider,
to: deliveryTarget,
payloads: deliveryPayloads,
bestEffort: bestEffortDeliver,
onError: (err) => logDeliveryError(err),
onPayload: logPayload,
deps: {
sendWhatsApp: deps.sendMessageWhatsApp,
sendTelegram: deps.sendMessageTelegram,
sendDiscord: deps.sendMessageDiscord,
sendSlack: deps.sendMessageSlack,
sendSignal: deps.sendMessageSignal,
sendIMessage: deps.sendMessageIMessage,
},
});
}
}

View File

@@ -1,7 +1,10 @@
import { describe, expect, it, vi } from "vitest";
import type { ClawdbotConfig } from "../../config/config.js";
import { deliverOutboundPayloads } from "./deliver.js";
import {
deliverOutboundPayloads,
normalizeOutboundPayloads,
} from "./deliver.js";
describe("deliverOutboundPayloads", () => {
it("chunks telegram markdown and passes config token", async () => {
@@ -86,9 +89,7 @@ describe("deliverOutboundPayloads", () => {
});
it("uses iMessage media maxBytes from agent fallback", async () => {
const sendIMessage = vi
.fn()
.mockResolvedValue({ messageId: "i1" });
const sendIMessage = vi.fn().mockResolvedValue({ messageId: "i1" });
const cfg: ClawdbotConfig = { agent: { mediaMaxMb: 3 } };
await deliverOutboundPayloads({
@@ -105,4 +106,41 @@ describe("deliverOutboundPayloads", () => {
expect.objectContaining({ maxBytes: 3 * 1024 * 1024 }),
);
});
it("normalizes payloads and drops empty entries", () => {
const normalized = normalizeOutboundPayloads([
{ text: "hi" },
{ mediaUrl: "https://x.test/a.jpg" },
{ text: " ", mediaUrls: [] },
]);
expect(normalized).toEqual([
{ text: "hi", mediaUrls: [] },
{ text: "", mediaUrls: ["https://x.test/a.jpg"] },
]);
});
it("continues on errors when bestEffort is enabled", async () => {
const sendWhatsApp = vi
.fn()
.mockRejectedValueOnce(new Error("fail"))
.mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
const onError = vi.fn();
const cfg: ClawdbotConfig = {};
const results = await deliverOutboundPayloads({
cfg,
provider: "whatsapp",
to: "+1555",
payloads: [{ text: "a" }, { text: "b" }],
deps: { sendWhatsApp },
bestEffort: true,
onError,
});
expect(sendWhatsApp).toHaveBeenCalledTimes(2);
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([
{ provider: "whatsapp", messageId: "w2", toJid: "jid" },
]);
});
});

View File

@@ -33,6 +33,11 @@ export type OutboundDeliveryResult =
| { provider: "signal"; messageId: string; timestamp?: number }
| { provider: "imessage"; messageId: string };
export type NormalizedOutboundPayload = {
text: string;
mediaUrls: string[];
};
type Chunker = (text: string, limit: number) => string[];
function resolveChunker(provider: OutboundProvider): Chunker | null {
@@ -55,8 +60,15 @@ function resolveIMessageMaxBytes(cfg: ClawdbotConfig): number | undefined {
return undefined;
}
function normalizeMediaUrls(payload: ReplyPayload): string[] {
return payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
export function normalizeOutboundPayloads(
payloads: ReplyPayload[],
): NormalizedOutboundPayload[] {
return payloads
.map((payload) => ({
text: payload.text ?? "",
mediaUrls: payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []),
}))
.filter((payload) => payload.text || payload.mediaUrls.length > 0);
}
export async function deliverOutboundPayloads(params: {
@@ -65,6 +77,9 @@ export async function deliverOutboundPayloads(params: {
to: string;
payloads: ReplyPayload[];
deps?: OutboundSendDeps;
bestEffort?: boolean;
onError?: (err: unknown, payload: NormalizedOutboundPayload) => void;
onPayload?: (payload: NormalizedOutboundPayload) => void;
}): Promise<OutboundDeliveryResult[]> {
const { cfg, provider, to, payloads } = params;
const deps = {
@@ -179,21 +194,24 @@ export async function deliverOutboundPayloads(params: {
results.push({ provider: "discord", ...res });
};
for (const payload of payloads) {
const text = payload.text ?? "";
const mediaUrls = normalizeMediaUrls(payload);
if (!text && mediaUrls.length === 0) continue;
const normalizedPayloads = normalizeOutboundPayloads(payloads);
for (const payload of normalizedPayloads) {
try {
params.onPayload?.(payload);
if (payload.mediaUrls.length === 0) {
await sendTextChunks(payload.text);
continue;
}
if (mediaUrls.length === 0) {
await sendTextChunks(text);
continue;
}
let first = true;
for (const url of mediaUrls) {
const caption = first ? text : "";
first = false;
await sendMedia(caption, url);
let first = true;
for (const url of payload.mediaUrls) {
const caption = first ? payload.text : "";
first = false;
await sendMedia(caption, url);
}
} catch (err) {
if (!params.bestEffort) throw err;
params.onError?.(err, payload);
}
}
return results;