diff --git a/src/telegram/send.ts b/src/telegram/send.ts index 89d28e4ba..59ec33f7b 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -45,6 +45,27 @@ export async function sendMessageTelegram( const api = opts.api ?? bot?.api; const mediaUrl = opts.mediaUrl?.trim(); + const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + const sendWithRetry = async (fn: () => Promise, label: string) => { + let lastErr: unknown; + for (let attempt = 1; attempt <= 3; attempt++) { + try { + return await fn(); + } catch (err) { + lastErr = err; + const terminal = attempt === 3 || + !/429|timeout|connect|reset|closed|unavailable|temporarily/i.test(String(err ?? "")); + if (terminal) break; + const backoff = 400 * attempt; + if (opts.verbose) { + console.warn(`telegram send retry ${attempt}/2 for ${label} in ${backoff}ms: ${String(err)}`); + } + await sleep(backoff); + } + } + throw lastErr ?? new Error(`Telegram send failed (${label})`); + }; + if (mediaUrl) { const media = await loadWebMedia(mediaUrl, opts.maxBytes); const kind = mediaKindFromMime(media.contentType ?? undefined); @@ -59,13 +80,13 @@ export async function sendMessageTelegram( | Awaited> | Awaited>; if (kind === "image") { - result = await api.sendPhoto(chatId, file, { caption }); + result = await sendWithRetry(() => api.sendPhoto(chatId, file, { caption }), "photo"); } else if (kind === "video") { - result = await api.sendVideo(chatId, file, { caption }); + result = await sendWithRetry(() => api.sendVideo(chatId, file, { caption }), "video"); } else if (kind === "audio") { - result = await api.sendAudio(chatId, file, { caption }); + result = await sendWithRetry(() => api.sendAudio(chatId, file, { caption }), "audio"); } else { - result = await api.sendDocument(chatId, file, { caption }); + result = await sendWithRetry(() => api.sendDocument(chatId, file, { caption }), "document"); } const messageId = String(result?.message_id ?? "unknown"); return { messageId, chatId: String(result?.chat?.id ?? chatId) }; @@ -74,9 +95,10 @@ export async function sendMessageTelegram( if (!text || !text.trim()) { throw new Error("Message must be non-empty for Telegram sends"); } - const res = await api.sendMessage(chatId, text, { - parse_mode: "Markdown", - }); + const res = await sendWithRetry( + () => api.sendMessage(chatId, text, { parse_mode: "Markdown" }), + "message", + ); const messageId = String(res?.message_id ?? "unknown"); return { messageId, chatId: String(res?.chat?.id ?? chatId) }; } diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 4b6d74348..f9151ed40 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -503,10 +503,42 @@ async function deliverWebReply(params: { ? [replyResult.mediaUrl] : []; + const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + + const sendWithRetry = async ( + fn: () => Promise, + label: string, + maxAttempts = 3, + ) => { + let lastErr: unknown; + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + return await fn(); + } catch (err) { + lastErr = err; + const isLast = attempt === maxAttempts; + const shouldRetry = /closed|reset|timed\s*out|disconnect/i.test( + String(err ?? ""), + ); + if (!shouldRetry || isLast) { + throw err; + } + const backoffMs = 500 * attempt; + logVerbose( + `Retrying ${label} to ${msg.from} after failure (${attempt}/${maxAttempts - 1}) in ${backoffMs}ms: ${String( + err, + )}`, + ); + await sleep(backoffMs); + } + } + throw lastErr; + }; + // Text-only replies if (mediaList.length === 0 && textChunks.length) { for (const chunk of textChunks) { - await msg.reply(chunk); + await sendWithRetry(() => msg.reply(chunk), "text"); } if (!skipLog) { logInfo( @@ -548,33 +580,49 @@ async function deliverWebReply(params: { ); } if (media.kind === "image") { - await msg.sendMedia({ - image: media.buffer, - caption, - mimetype: media.contentType, - }); + await sendWithRetry( + () => + msg.sendMedia({ + image: media.buffer, + caption, + mimetype: media.contentType, + }), + "media:image", + ); } else if (media.kind === "audio") { - await msg.sendMedia({ - audio: media.buffer, - ptt: true, - mimetype: media.contentType, - caption, - }); + await sendWithRetry( + () => + msg.sendMedia({ + audio: media.buffer, + ptt: true, + mimetype: media.contentType, + caption, + }), + "media:audio", + ); } else if (media.kind === "video") { - await msg.sendMedia({ - video: media.buffer, - caption, - mimetype: media.contentType, - }); + await sendWithRetry( + () => + msg.sendMedia({ + video: media.buffer, + caption, + mimetype: media.contentType, + }), + "media:video", + ); } else { const fileName = mediaUrl.split("/").pop() ?? "file"; const mimetype = media.contentType ?? "application/octet-stream"; - await msg.sendMedia({ - document: media.buffer, - fileName, - caption, - mimetype, - }); + await sendWithRetry( + () => + msg.sendMedia({ + document: media.buffer, + fileName, + caption, + mimetype, + }), + "media:document", + ); } logInfo( `✅ Sent web media reply to ${msg.from} (${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB)`, diff --git a/test/auto-reply.retry.test.ts b/test/auto-reply.retry.test.ts new file mode 100644 index 000000000..593fefeaf --- /dev/null +++ b/test/auto-reply.retry.test.ts @@ -0,0 +1,69 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("../src/web/media.js", () => ({ + loadWebMedia: vi.fn(async () => ({ + buffer: Buffer.from("img"), + contentType: "image/jpeg", + kind: "image", + fileName: "img.jpg", + })), +})); + +import { deliverWebReply } from "../src/web/auto-reply.js"; +import { defaultRuntime } from "../src/runtime.js"; + +const noopLogger = { + info: vi.fn(), + warn: vi.fn(), +}; + +function makeMsg() { + return { + from: "+10000000000", + to: "+20000000000", + id: "abc", + reply: vi.fn(), + sendMedia: vi.fn(), + } as any; +} + +describe("deliverWebReply retry", () => { + it("retries text send on transient failure", async () => { + const msg = makeMsg(); + msg.reply.mockRejectedValueOnce(new Error("connection closed")); + msg.reply.mockResolvedValueOnce(undefined); + + await expect( + deliverWebReply({ + replyResult: { text: "hi" }, + msg, + maxMediaBytes: 5_000_000, + replyLogger: noopLogger, + runtime: defaultRuntime, + skipLog: true, + }), + ).resolves.toBeUndefined(); + + expect(msg.reply).toHaveBeenCalledTimes(2); + }); + + it("retries media send on transient failure", async () => { + const msg = makeMsg(); + msg.sendMedia.mockRejectedValueOnce(new Error("socket reset")); + msg.sendMedia.mockResolvedValueOnce(undefined); + + await expect( + deliverWebReply({ + replyResult: { text: "caption", mediaUrl: "http://example.com/img.jpg" }, + msg, + maxMediaBytes: 5_000_000, + replyLogger: noopLogger, + runtime: defaultRuntime, + skipLog: true, + }), + ).resolves.toBeUndefined(); + + expect(msg.sendMedia).toHaveBeenCalledTimes(2); + }); +}); +