diff --git a/src/cron/isolated-agent.test.ts b/src/cron/isolated-agent.test.ts index c0364fc91..8591e7bdd 100644 --- a/src/cron/isolated-agent.test.ts +++ b/src/cron/isolated-agent.test.ts @@ -450,6 +450,51 @@ describe("runCronIsolatedAgentTurn", () => { }); }); + it("delivers telegram shorthand topic suffixes with messageThreadId", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn().mockResolvedValue({ + messageId: "t1", + chatId: "-1001234567890", + }), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "hello from cron" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath), + deps, + job: makeJob({ + kind: "agentTurn", + message: "do it", + deliver: true, + provider: "telegram", + to: "-1001234567890:321", + }), + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(deps.sendMessageTelegram).toHaveBeenCalledWith( + "-1001234567890", + "hello from cron", + expect.objectContaining({ messageThreadId: 321 }), + ); + }); + }); + it("delivers via discord when configured", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); diff --git a/src/cron/isolated-agent.ts b/src/cron/isolated-agent.ts index 3488fcd5f..a4f7be945 100644 --- a/src/cron/isolated-agent.ts +++ b/src/cron/isolated-agent.ts @@ -57,6 +57,12 @@ export type RunCronAgentTurnResult = { error?: string; }; +type DeliveryPayload = { + text?: string; + mediaUrl?: string; + mediaUrls?: string[]; +}; + function pickSummaryFromOutput(text: string | undefined) { const clean = (text ?? "").trim(); if (!clean) return undefined; @@ -79,7 +85,7 @@ function pickSummaryFromPayloads( * Returns true if delivery should be skipped because there's no real content. */ function isHeartbeatOnlyResponse( - payloads: Array<{ text?: string; mediaUrl?: string; mediaUrls?: string[] }>, + payloads: DeliveryPayload[], ackMaxChars: number, ) { if (payloads.length === 0) return true; @@ -96,6 +102,53 @@ function isHeartbeatOnlyResponse( return result.shouldSkip; }); } + +function getMediaList(payload: DeliveryPayload) { + return payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); +} + +async function deliverPayloadsWithMedia(params: { + payloads: DeliveryPayload[]; + sendText: (text: string) => Promise; + sendMedia: (caption: string, mediaUrl: string) => Promise; +}) { + for (const payload of params.payloads) { + const mediaList = getMediaList(payload); + if (mediaList.length === 0) { + await params.sendText(payload.text ?? ""); + continue; + } + let first = true; + for (const url of mediaList) { + const caption = first ? (payload.text ?? "") : ""; + first = false; + await params.sendMedia(caption, url); + } + } +} + +async function deliverChunkedPayloads(params: { + payloads: DeliveryPayload[]; + chunkText: (text: string) => string[]; + sendText: (text: string) => Promise; + sendMedia: (caption: string, mediaUrl: string) => Promise; +}) { + for (const payload of params.payloads) { + const mediaList = getMediaList(payload); + if (mediaList.length === 0) { + for (const chunk of params.chunkText(payload.text ?? "")) { + await params.sendText(chunk); + } + continue; + } + let first = true; + for (const url of mediaList) { + const caption = first ? (payload.text ?? "") : ""; + first = false; + await params.sendMedia(caption, url); + } + } +} function resolveDeliveryTarget( cfg: ClawdbotConfig, jobPayload: { @@ -143,28 +196,34 @@ function resolveDeliveryTarget( return lastProvider ?? "whatsapp"; })(); - const to = (() => { - if (explicitTo) return explicitTo; - return lastTo || undefined; - })(); + const rawTo = explicitTo ?? (lastTo || undefined); + const telegramTarget = + provider === "telegram" && rawTo ? parseTelegramTarget(rawTo) : undefined; const sanitizedWhatsappTo = (() => { - if (provider !== "whatsapp") return to; + if (provider !== "whatsapp") return rawTo; const rawAllow = cfg.whatsapp?.allowFrom ?? []; - if (rawAllow.includes("*")) return to; + if (rawAllow.includes("*")) return rawTo; const allowFrom = rawAllow .map((val) => normalizeE164(val)) .filter((val) => val.length > 1); - if (allowFrom.length === 0) return to; - if (!to) return allowFrom[0]; - const normalized = normalizeE164(to); + if (allowFrom.length === 0) return rawTo; + if (!rawTo) return allowFrom[0]; + const normalized = normalizeE164(rawTo); if (allowFrom.includes(normalized)) return normalized; return allowFrom[0]; })(); + const to = (() => { + if (provider === "telegram" && telegramTarget) return telegramTarget.chatId; + if (provider === "whatsapp") return sanitizedWhatsappTo; + return rawTo; + })(); + return { provider, - to: provider === "whatsapp" ? sanitizedWhatsappTo : to, + to, + messageThreadId: telegramTarget?.messageThreadId, }; } @@ -455,21 +514,16 @@ export async function runCronIsolatedAgentTurn(params: { } const to = normalizeE164(resolvedDelivery.to); try { - for (const payload of payloads) { - const mediaList = - payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); - const primaryMedia = mediaList[0]; - await params.deps.sendMessageWhatsApp(to, payload.text ?? "", { - verbose: false, - mediaUrl: primaryMedia, - }); - for (const extra of mediaList.slice(1)) { - await params.deps.sendMessageWhatsApp(to, "", { + await deliverPayloadsWithMedia({ + payloads, + sendText: (text) => + params.deps.sendMessageWhatsApp(to, text, { verbose: false }), + sendMedia: (caption, mediaUrl) => + params.deps.sendMessageWhatsApp(to, caption, { verbose: false, - mediaUrl: extra, - }); - } - } + mediaUrl, + }), + }); } catch (err) { if (!bestEffortDeliver) return { status: "error", summary, error: String(err) }; @@ -488,39 +542,27 @@ export async function runCronIsolatedAgentTurn(params: { summary: "Delivery skipped (no Telegram chatId).", }; } - const telegramTarget = parseTelegramTarget(resolvedDelivery.to); - const chatId = telegramTarget.chatId; - const messageThreadId = telegramTarget.messageThreadId; + const chatId = resolvedDelivery.to; + const messageThreadId = resolvedDelivery.messageThreadId; const textLimit = resolveTextChunkLimit(params.cfg, "telegram"); try { - for (const payload of payloads) { - const mediaList = - payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); - if (mediaList.length === 0) { - for (const chunk of chunkMarkdownText( - payload.text ?? "", - textLimit, - )) { - await params.deps.sendMessageTelegram(chatId, chunk, { - verbose: false, - token: telegramToken || undefined, - messageThreadId, - }); - } - } else { - let first = true; - for (const url of mediaList) { - const caption = first ? (payload.text ?? "") : ""; - first = false; - await params.deps.sendMessageTelegram(chatId, caption, { - verbose: false, - mediaUrl: url, - token: telegramToken || undefined, - messageThreadId, - }); - } - } - } + await deliverChunkedPayloads({ + payloads, + chunkText: (text) => chunkMarkdownText(text, textLimit), + sendText: (text) => + params.deps.sendMessageTelegram(chatId, text, { + verbose: false, + token: telegramToken || undefined, + messageThreadId, + }), + sendMedia: (caption, mediaUrl) => + params.deps.sendMessageTelegram(chatId, caption, { + verbose: false, + mediaUrl, + token: telegramToken || undefined, + messageThreadId, + }), + }); } catch (err) { if (!bestEffortDeliver) return { status: "error", summary, error: String(err) }; @@ -542,29 +584,18 @@ export async function runCronIsolatedAgentTurn(params: { } const discordTarget = resolvedDelivery.to; try { - for (const payload of payloads) { - const mediaList = - payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); - if (mediaList.length === 0) { - await params.deps.sendMessageDiscord( - discordTarget, - payload.text ?? "", - { - token: process.env.DISCORD_BOT_TOKEN, - }, - ); - } else { - let first = true; - for (const url of mediaList) { - const caption = first ? (payload.text ?? "") : ""; - first = false; - await params.deps.sendMessageDiscord(discordTarget, caption, { - token: process.env.DISCORD_BOT_TOKEN, - mediaUrl: url, - }); - } - } - } + await deliverPayloadsWithMedia({ + payloads, + sendText: (text) => + params.deps.sendMessageDiscord(discordTarget, text, { + token: process.env.DISCORD_BOT_TOKEN, + }), + sendMedia: (caption, mediaUrl) => + params.deps.sendMessageDiscord(discordTarget, caption, { + token: process.env.DISCORD_BOT_TOKEN, + mediaUrl, + }), + }); } catch (err) { if (!bestEffortDeliver) return { status: "error", summary, error: String(err) }; @@ -587,27 +618,13 @@ export async function runCronIsolatedAgentTurn(params: { const slackTarget = resolvedDelivery.to; const textLimit = resolveTextChunkLimit(params.cfg, "slack"); try { - for (const payload of payloads) { - const mediaList = - payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); - if (mediaList.length === 0) { - for (const chunk of chunkMarkdownText( - payload.text ?? "", - textLimit, - )) { - await params.deps.sendMessageSlack(slackTarget, chunk); - } - } else { - let first = true; - for (const url of mediaList) { - const caption = first ? (payload.text ?? "") : ""; - first = false; - await params.deps.sendMessageSlack(slackTarget, caption, { - mediaUrl: url, - }); - } - } - } + await deliverChunkedPayloads({ + payloads, + chunkText: (text) => chunkMarkdownText(text, textLimit), + sendText: (text) => params.deps.sendMessageSlack(slackTarget, text), + sendMedia: (caption, mediaUrl) => + params.deps.sendMessageSlack(slackTarget, caption, { mediaUrl }), + }); } catch (err) { if (!bestEffortDeliver) return { status: "error", summary, error: String(err) }; @@ -629,24 +646,13 @@ export async function runCronIsolatedAgentTurn(params: { const to = resolvedDelivery.to; const textLimit = resolveTextChunkLimit(params.cfg, "signal"); try { - for (const payload of payloads) { - const mediaList = - payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); - if (mediaList.length === 0) { - for (const chunk of chunkText(payload.text ?? "", textLimit)) { - await params.deps.sendMessageSignal(to, chunk); - } - } else { - let first = true; - for (const url of mediaList) { - const caption = first ? (payload.text ?? "") : ""; - first = false; - await params.deps.sendMessageSignal(to, caption, { - mediaUrl: url, - }); - } - } - } + await deliverChunkedPayloads({ + payloads, + chunkText: (text) => chunkText(text, textLimit), + sendText: (text) => params.deps.sendMessageSignal(to, text), + sendMedia: (caption, mediaUrl) => + params.deps.sendMessageSignal(to, caption, { mediaUrl }), + }); } catch (err) { if (!bestEffortDeliver) return { status: "error", summary, error: String(err) }; @@ -668,24 +674,13 @@ export async function runCronIsolatedAgentTurn(params: { const to = resolvedDelivery.to; const textLimit = resolveTextChunkLimit(params.cfg, "imessage"); try { - for (const payload of payloads) { - const mediaList = - payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); - if (mediaList.length === 0) { - for (const chunk of chunkText(payload.text ?? "", textLimit)) { - await params.deps.sendMessageIMessage(to, chunk); - } - } else { - let first = true; - for (const url of mediaList) { - const caption = first ? (payload.text ?? "") : ""; - first = false; - await params.deps.sendMessageIMessage(to, caption, { - mediaUrl: url, - }); - } - } - } + await deliverChunkedPayloads({ + payloads, + chunkText: (text) => chunkText(text, textLimit), + sendText: (text) => params.deps.sendMessageIMessage(to, text), + sendMedia: (caption, mediaUrl) => + params.deps.sendMessageIMessage(to, caption, { mediaUrl }), + }); } catch (err) { if (!bestEffortDeliver) return { status: "error", summary, error: String(err) };