msg: retry web/telegram sends and add regression tests
This commit is contained in:
@@ -45,6 +45,27 @@ export async function sendMessageTelegram(
|
||||
const api = opts.api ?? bot?.api;
|
||||
const mediaUrl = opts.mediaUrl?.trim();
|
||||
|
||||
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
const sendWithRetry = async <T>(fn: () => Promise<T>, label: string) => {
|
||||
let lastErr: unknown;
|
||||
for (let attempt = 1; attempt <= 3; attempt++) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (err) {
|
||||
lastErr = err;
|
||||
const terminal = attempt === 3 ||
|
||||
!/429|timeout|connect|reset|closed|unavailable|temporarily/i.test(String(err ?? ""));
|
||||
if (terminal) break;
|
||||
const backoff = 400 * attempt;
|
||||
if (opts.verbose) {
|
||||
console.warn(`telegram send retry ${attempt}/2 for ${label} in ${backoff}ms: ${String(err)}`);
|
||||
}
|
||||
await sleep(backoff);
|
||||
}
|
||||
}
|
||||
throw lastErr ?? new Error(`Telegram send failed (${label})`);
|
||||
};
|
||||
|
||||
if (mediaUrl) {
|
||||
const media = await loadWebMedia(mediaUrl, opts.maxBytes);
|
||||
const kind = mediaKindFromMime(media.contentType ?? undefined);
|
||||
@@ -59,13 +80,13 @@ export async function sendMessageTelegram(
|
||||
| Awaited<ReturnType<typeof api.sendAudio>>
|
||||
| Awaited<ReturnType<typeof api.sendDocument>>;
|
||||
if (kind === "image") {
|
||||
result = await api.sendPhoto(chatId, file, { caption });
|
||||
result = await sendWithRetry(() => api.sendPhoto(chatId, file, { caption }), "photo");
|
||||
} else if (kind === "video") {
|
||||
result = await api.sendVideo(chatId, file, { caption });
|
||||
result = await sendWithRetry(() => api.sendVideo(chatId, file, { caption }), "video");
|
||||
} else if (kind === "audio") {
|
||||
result = await api.sendAudio(chatId, file, { caption });
|
||||
result = await sendWithRetry(() => api.sendAudio(chatId, file, { caption }), "audio");
|
||||
} else {
|
||||
result = await api.sendDocument(chatId, file, { caption });
|
||||
result = await sendWithRetry(() => api.sendDocument(chatId, file, { caption }), "document");
|
||||
}
|
||||
const messageId = String(result?.message_id ?? "unknown");
|
||||
return { messageId, chatId: String(result?.chat?.id ?? chatId) };
|
||||
@@ -74,9 +95,10 @@ export async function sendMessageTelegram(
|
||||
if (!text || !text.trim()) {
|
||||
throw new Error("Message must be non-empty for Telegram sends");
|
||||
}
|
||||
const res = await api.sendMessage(chatId, text, {
|
||||
parse_mode: "Markdown",
|
||||
});
|
||||
const res = await sendWithRetry(
|
||||
() => api.sendMessage(chatId, text, { parse_mode: "Markdown" }),
|
||||
"message",
|
||||
);
|
||||
const messageId = String(res?.message_id ?? "unknown");
|
||||
return { messageId, chatId: String(res?.chat?.id ?? chatId) };
|
||||
}
|
||||
|
||||
@@ -503,10 +503,42 @@ async function deliverWebReply(params: {
|
||||
? [replyResult.mediaUrl]
|
||||
: [];
|
||||
|
||||
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
const sendWithRetry = async (
|
||||
fn: () => Promise<unknown>,
|
||||
label: string,
|
||||
maxAttempts = 3,
|
||||
) => {
|
||||
let lastErr: unknown;
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (err) {
|
||||
lastErr = err;
|
||||
const isLast = attempt === maxAttempts;
|
||||
const shouldRetry = /closed|reset|timed\s*out|disconnect/i.test(
|
||||
String(err ?? ""),
|
||||
);
|
||||
if (!shouldRetry || isLast) {
|
||||
throw err;
|
||||
}
|
||||
const backoffMs = 500 * attempt;
|
||||
logVerbose(
|
||||
`Retrying ${label} to ${msg.from} after failure (${attempt}/${maxAttempts - 1}) in ${backoffMs}ms: ${String(
|
||||
err,
|
||||
)}`,
|
||||
);
|
||||
await sleep(backoffMs);
|
||||
}
|
||||
}
|
||||
throw lastErr;
|
||||
};
|
||||
|
||||
// Text-only replies
|
||||
if (mediaList.length === 0 && textChunks.length) {
|
||||
for (const chunk of textChunks) {
|
||||
await msg.reply(chunk);
|
||||
await sendWithRetry(() => msg.reply(chunk), "text");
|
||||
}
|
||||
if (!skipLog) {
|
||||
logInfo(
|
||||
@@ -548,33 +580,49 @@ async function deliverWebReply(params: {
|
||||
);
|
||||
}
|
||||
if (media.kind === "image") {
|
||||
await msg.sendMedia({
|
||||
image: media.buffer,
|
||||
caption,
|
||||
mimetype: media.contentType,
|
||||
});
|
||||
await sendWithRetry(
|
||||
() =>
|
||||
msg.sendMedia({
|
||||
image: media.buffer,
|
||||
caption,
|
||||
mimetype: media.contentType,
|
||||
}),
|
||||
"media:image",
|
||||
);
|
||||
} else if (media.kind === "audio") {
|
||||
await msg.sendMedia({
|
||||
audio: media.buffer,
|
||||
ptt: true,
|
||||
mimetype: media.contentType,
|
||||
caption,
|
||||
});
|
||||
await sendWithRetry(
|
||||
() =>
|
||||
msg.sendMedia({
|
||||
audio: media.buffer,
|
||||
ptt: true,
|
||||
mimetype: media.contentType,
|
||||
caption,
|
||||
}),
|
||||
"media:audio",
|
||||
);
|
||||
} else if (media.kind === "video") {
|
||||
await msg.sendMedia({
|
||||
video: media.buffer,
|
||||
caption,
|
||||
mimetype: media.contentType,
|
||||
});
|
||||
await sendWithRetry(
|
||||
() =>
|
||||
msg.sendMedia({
|
||||
video: media.buffer,
|
||||
caption,
|
||||
mimetype: media.contentType,
|
||||
}),
|
||||
"media:video",
|
||||
);
|
||||
} else {
|
||||
const fileName = mediaUrl.split("/").pop() ?? "file";
|
||||
const mimetype = media.contentType ?? "application/octet-stream";
|
||||
await msg.sendMedia({
|
||||
document: media.buffer,
|
||||
fileName,
|
||||
caption,
|
||||
mimetype,
|
||||
});
|
||||
await sendWithRetry(
|
||||
() =>
|
||||
msg.sendMedia({
|
||||
document: media.buffer,
|
||||
fileName,
|
||||
caption,
|
||||
mimetype,
|
||||
}),
|
||||
"media:document",
|
||||
);
|
||||
}
|
||||
logInfo(
|
||||
`✅ Sent web media reply to ${msg.from} (${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB)`,
|
||||
|
||||
69
test/auto-reply.retry.test.ts
Normal file
69
test/auto-reply.retry.test.ts
Normal file
@@ -0,0 +1,69 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
vi.mock("../src/web/media.js", () => ({
|
||||
loadWebMedia: vi.fn(async () => ({
|
||||
buffer: Buffer.from("img"),
|
||||
contentType: "image/jpeg",
|
||||
kind: "image",
|
||||
fileName: "img.jpg",
|
||||
})),
|
||||
}));
|
||||
|
||||
import { deliverWebReply } from "../src/web/auto-reply.js";
|
||||
import { defaultRuntime } from "../src/runtime.js";
|
||||
|
||||
const noopLogger = {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
};
|
||||
|
||||
function makeMsg() {
|
||||
return {
|
||||
from: "+10000000000",
|
||||
to: "+20000000000",
|
||||
id: "abc",
|
||||
reply: vi.fn(),
|
||||
sendMedia: vi.fn(),
|
||||
} as any;
|
||||
}
|
||||
|
||||
describe("deliverWebReply retry", () => {
|
||||
it("retries text send on transient failure", async () => {
|
||||
const msg = makeMsg();
|
||||
msg.reply.mockRejectedValueOnce(new Error("connection closed"));
|
||||
msg.reply.mockResolvedValueOnce(undefined);
|
||||
|
||||
await expect(
|
||||
deliverWebReply({
|
||||
replyResult: { text: "hi" },
|
||||
msg,
|
||||
maxMediaBytes: 5_000_000,
|
||||
replyLogger: noopLogger,
|
||||
runtime: defaultRuntime,
|
||||
skipLog: true,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(msg.reply).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("retries media send on transient failure", async () => {
|
||||
const msg = makeMsg();
|
||||
msg.sendMedia.mockRejectedValueOnce(new Error("socket reset"));
|
||||
msg.sendMedia.mockResolvedValueOnce(undefined);
|
||||
|
||||
await expect(
|
||||
deliverWebReply({
|
||||
replyResult: { text: "caption", mediaUrl: "http://example.com/img.jpg" },
|
||||
msg,
|
||||
maxMediaBytes: 5_000_000,
|
||||
replyLogger: noopLogger,
|
||||
runtime: defaultRuntime,
|
||||
skipLog: true,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(msg.sendMedia).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user