refactor(cron): centralize telegram targets + delivery helpers

This commit is contained in:
Peter Steinberger
2026-01-08 21:36:36 +00:00
parent 5939dff092
commit 5b46214379
2 changed files with 176 additions and 136 deletions

View File

@@ -450,6 +450,51 @@ describe("runCronIsolatedAgentTurn", () => {
}); });
}); });
it("delivers telegram shorthand topic suffixes with messageThreadId", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn().mockResolvedValue({
messageId: "t1",
chatId: "-1001234567890",
}),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "hello from cron" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath),
deps,
job: makeJob({
kind: "agentTurn",
message: "do it",
deliver: true,
provider: "telegram",
to: "-1001234567890:321",
}),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(deps.sendMessageTelegram).toHaveBeenCalledWith(
"-1001234567890",
"hello from cron",
expect.objectContaining({ messageThreadId: 321 }),
);
});
});
it("delivers via discord when configured", async () => { it("delivers via discord when configured", async () => {
await withTempHome(async (home) => { await withTempHome(async (home) => {
const storePath = await writeSessionStore(home); const storePath = await writeSessionStore(home);

View File

@@ -57,6 +57,12 @@ export type RunCronAgentTurnResult = {
error?: string; error?: string;
}; };
type DeliveryPayload = {
text?: string;
mediaUrl?: string;
mediaUrls?: string[];
};
function pickSummaryFromOutput(text: string | undefined) { function pickSummaryFromOutput(text: string | undefined) {
const clean = (text ?? "").trim(); const clean = (text ?? "").trim();
if (!clean) return undefined; if (!clean) return undefined;
@@ -79,7 +85,7 @@ function pickSummaryFromPayloads(
* Returns true if delivery should be skipped because there's no real content. * Returns true if delivery should be skipped because there's no real content.
*/ */
function isHeartbeatOnlyResponse( function isHeartbeatOnlyResponse(
payloads: Array<{ text?: string; mediaUrl?: string; mediaUrls?: string[] }>, payloads: DeliveryPayload[],
ackMaxChars: number, ackMaxChars: number,
) { ) {
if (payloads.length === 0) return true; if (payloads.length === 0) return true;
@@ -96,6 +102,53 @@ function isHeartbeatOnlyResponse(
return result.shouldSkip; return result.shouldSkip;
}); });
} }
function getMediaList(payload: DeliveryPayload) {
return payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
}
async function deliverPayloadsWithMedia(params: {
payloads: DeliveryPayload[];
sendText: (text: string) => Promise<unknown>;
sendMedia: (caption: string, mediaUrl: string) => Promise<unknown>;
}) {
for (const payload of params.payloads) {
const mediaList = getMediaList(payload);
if (mediaList.length === 0) {
await params.sendText(payload.text ?? "");
continue;
}
let first = true;
for (const url of mediaList) {
const caption = first ? (payload.text ?? "") : "";
first = false;
await params.sendMedia(caption, url);
}
}
}
async function deliverChunkedPayloads(params: {
payloads: DeliveryPayload[];
chunkText: (text: string) => string[];
sendText: (text: string) => Promise<unknown>;
sendMedia: (caption: string, mediaUrl: string) => Promise<unknown>;
}) {
for (const payload of params.payloads) {
const mediaList = getMediaList(payload);
if (mediaList.length === 0) {
for (const chunk of params.chunkText(payload.text ?? "")) {
await params.sendText(chunk);
}
continue;
}
let first = true;
for (const url of mediaList) {
const caption = first ? (payload.text ?? "") : "";
first = false;
await params.sendMedia(caption, url);
}
}
}
function resolveDeliveryTarget( function resolveDeliveryTarget(
cfg: ClawdbotConfig, cfg: ClawdbotConfig,
jobPayload: { jobPayload: {
@@ -143,28 +196,34 @@ function resolveDeliveryTarget(
return lastProvider ?? "whatsapp"; return lastProvider ?? "whatsapp";
})(); })();
const to = (() => { const rawTo = explicitTo ?? (lastTo || undefined);
if (explicitTo) return explicitTo; const telegramTarget =
return lastTo || undefined; provider === "telegram" && rawTo ? parseTelegramTarget(rawTo) : undefined;
})();
const sanitizedWhatsappTo = (() => { const sanitizedWhatsappTo = (() => {
if (provider !== "whatsapp") return to; if (provider !== "whatsapp") return rawTo;
const rawAllow = cfg.whatsapp?.allowFrom ?? []; const rawAllow = cfg.whatsapp?.allowFrom ?? [];
if (rawAllow.includes("*")) return to; if (rawAllow.includes("*")) return rawTo;
const allowFrom = rawAllow const allowFrom = rawAllow
.map((val) => normalizeE164(val)) .map((val) => normalizeE164(val))
.filter((val) => val.length > 1); .filter((val) => val.length > 1);
if (allowFrom.length === 0) return to; if (allowFrom.length === 0) return rawTo;
if (!to) return allowFrom[0]; if (!rawTo) return allowFrom[0];
const normalized = normalizeE164(to); const normalized = normalizeE164(rawTo);
if (allowFrom.includes(normalized)) return normalized; if (allowFrom.includes(normalized)) return normalized;
return allowFrom[0]; return allowFrom[0];
})(); })();
const to = (() => {
if (provider === "telegram" && telegramTarget) return telegramTarget.chatId;
if (provider === "whatsapp") return sanitizedWhatsappTo;
return rawTo;
})();
return { return {
provider, provider,
to: provider === "whatsapp" ? sanitizedWhatsappTo : to, to,
messageThreadId: telegramTarget?.messageThreadId,
}; };
} }
@@ -455,21 +514,16 @@ export async function runCronIsolatedAgentTurn(params: {
} }
const to = normalizeE164(resolvedDelivery.to); const to = normalizeE164(resolvedDelivery.to);
try { try {
for (const payload of payloads) { await deliverPayloadsWithMedia({
const mediaList = payloads,
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); sendText: (text) =>
const primaryMedia = mediaList[0]; params.deps.sendMessageWhatsApp(to, text, { verbose: false }),
await params.deps.sendMessageWhatsApp(to, payload.text ?? "", { sendMedia: (caption, mediaUrl) =>
verbose: false, params.deps.sendMessageWhatsApp(to, caption, {
mediaUrl: primaryMedia,
});
for (const extra of mediaList.slice(1)) {
await params.deps.sendMessageWhatsApp(to, "", {
verbose: false, verbose: false,
mediaUrl: extra, mediaUrl,
}); }),
} });
}
} catch (err) { } catch (err) {
if (!bestEffortDeliver) if (!bestEffortDeliver)
return { status: "error", summary, error: String(err) }; return { status: "error", summary, error: String(err) };
@@ -488,39 +542,27 @@ export async function runCronIsolatedAgentTurn(params: {
summary: "Delivery skipped (no Telegram chatId).", summary: "Delivery skipped (no Telegram chatId).",
}; };
} }
const telegramTarget = parseTelegramTarget(resolvedDelivery.to); const chatId = resolvedDelivery.to;
const chatId = telegramTarget.chatId; const messageThreadId = resolvedDelivery.messageThreadId;
const messageThreadId = telegramTarget.messageThreadId;
const textLimit = resolveTextChunkLimit(params.cfg, "telegram"); const textLimit = resolveTextChunkLimit(params.cfg, "telegram");
try { try {
for (const payload of payloads) { await deliverChunkedPayloads({
const mediaList = payloads,
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); chunkText: (text) => chunkMarkdownText(text, textLimit),
if (mediaList.length === 0) { sendText: (text) =>
for (const chunk of chunkMarkdownText( params.deps.sendMessageTelegram(chatId, text, {
payload.text ?? "", verbose: false,
textLimit, token: telegramToken || undefined,
)) { messageThreadId,
await params.deps.sendMessageTelegram(chatId, chunk, { }),
verbose: false, sendMedia: (caption, mediaUrl) =>
token: telegramToken || undefined, params.deps.sendMessageTelegram(chatId, caption, {
messageThreadId, verbose: false,
}); mediaUrl,
} token: telegramToken || undefined,
} else { messageThreadId,
let first = true; }),
for (const url of mediaList) { });
const caption = first ? (payload.text ?? "") : "";
first = false;
await params.deps.sendMessageTelegram(chatId, caption, {
verbose: false,
mediaUrl: url,
token: telegramToken || undefined,
messageThreadId,
});
}
}
}
} catch (err) { } catch (err) {
if (!bestEffortDeliver) if (!bestEffortDeliver)
return { status: "error", summary, error: String(err) }; return { status: "error", summary, error: String(err) };
@@ -542,29 +584,18 @@ export async function runCronIsolatedAgentTurn(params: {
} }
const discordTarget = resolvedDelivery.to; const discordTarget = resolvedDelivery.to;
try { try {
for (const payload of payloads) { await deliverPayloadsWithMedia({
const mediaList = payloads,
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); sendText: (text) =>
if (mediaList.length === 0) { params.deps.sendMessageDiscord(discordTarget, text, {
await params.deps.sendMessageDiscord( token: process.env.DISCORD_BOT_TOKEN,
discordTarget, }),
payload.text ?? "", sendMedia: (caption, mediaUrl) =>
{ params.deps.sendMessageDiscord(discordTarget, caption, {
token: process.env.DISCORD_BOT_TOKEN, token: process.env.DISCORD_BOT_TOKEN,
}, mediaUrl,
); }),
} else { });
let first = true;
for (const url of mediaList) {
const caption = first ? (payload.text ?? "") : "";
first = false;
await params.deps.sendMessageDiscord(discordTarget, caption, {
token: process.env.DISCORD_BOT_TOKEN,
mediaUrl: url,
});
}
}
}
} catch (err) { } catch (err) {
if (!bestEffortDeliver) if (!bestEffortDeliver)
return { status: "error", summary, error: String(err) }; return { status: "error", summary, error: String(err) };
@@ -587,27 +618,13 @@ export async function runCronIsolatedAgentTurn(params: {
const slackTarget = resolvedDelivery.to; const slackTarget = resolvedDelivery.to;
const textLimit = resolveTextChunkLimit(params.cfg, "slack"); const textLimit = resolveTextChunkLimit(params.cfg, "slack");
try { try {
for (const payload of payloads) { await deliverChunkedPayloads({
const mediaList = payloads,
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); chunkText: (text) => chunkMarkdownText(text, textLimit),
if (mediaList.length === 0) { sendText: (text) => params.deps.sendMessageSlack(slackTarget, text),
for (const chunk of chunkMarkdownText( sendMedia: (caption, mediaUrl) =>
payload.text ?? "", params.deps.sendMessageSlack(slackTarget, caption, { mediaUrl }),
textLimit, });
)) {
await params.deps.sendMessageSlack(slackTarget, chunk);
}
} else {
let first = true;
for (const url of mediaList) {
const caption = first ? (payload.text ?? "") : "";
first = false;
await params.deps.sendMessageSlack(slackTarget, caption, {
mediaUrl: url,
});
}
}
}
} catch (err) { } catch (err) {
if (!bestEffortDeliver) if (!bestEffortDeliver)
return { status: "error", summary, error: String(err) }; return { status: "error", summary, error: String(err) };
@@ -629,24 +646,13 @@ export async function runCronIsolatedAgentTurn(params: {
const to = resolvedDelivery.to; const to = resolvedDelivery.to;
const textLimit = resolveTextChunkLimit(params.cfg, "signal"); const textLimit = resolveTextChunkLimit(params.cfg, "signal");
try { try {
for (const payload of payloads) { await deliverChunkedPayloads({
const mediaList = payloads,
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); chunkText: (text) => chunkText(text, textLimit),
if (mediaList.length === 0) { sendText: (text) => params.deps.sendMessageSignal(to, text),
for (const chunk of chunkText(payload.text ?? "", textLimit)) { sendMedia: (caption, mediaUrl) =>
await params.deps.sendMessageSignal(to, chunk); params.deps.sendMessageSignal(to, caption, { mediaUrl }),
} });
} else {
let first = true;
for (const url of mediaList) {
const caption = first ? (payload.text ?? "") : "";
first = false;
await params.deps.sendMessageSignal(to, caption, {
mediaUrl: url,
});
}
}
}
} catch (err) { } catch (err) {
if (!bestEffortDeliver) if (!bestEffortDeliver)
return { status: "error", summary, error: String(err) }; return { status: "error", summary, error: String(err) };
@@ -668,24 +674,13 @@ export async function runCronIsolatedAgentTurn(params: {
const to = resolvedDelivery.to; const to = resolvedDelivery.to;
const textLimit = resolveTextChunkLimit(params.cfg, "imessage"); const textLimit = resolveTextChunkLimit(params.cfg, "imessage");
try { try {
for (const payload of payloads) { await deliverChunkedPayloads({
const mediaList = payloads,
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); chunkText: (text) => chunkText(text, textLimit),
if (mediaList.length === 0) { sendText: (text) => params.deps.sendMessageIMessage(to, text),
for (const chunk of chunkText(payload.text ?? "", textLimit)) { sendMedia: (caption, mediaUrl) =>
await params.deps.sendMessageIMessage(to, chunk); params.deps.sendMessageIMessage(to, caption, { mediaUrl }),
} });
} else {
let first = true;
for (const url of mediaList) {
const caption = first ? (payload.text ?? "") : "";
first = false;
await params.deps.sendMessageIMessage(to, caption, {
mediaUrl: url,
});
}
}
}
} catch (err) { } catch (err) {
if (!bestEffortDeliver) if (!bestEffortDeliver)
return { status: "error", summary, error: String(err) }; return { status: "error", summary, error: String(err) };