From de55f4e11171b69b1cb6b973efae16ca174074fa Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 7 Jan 2026 17:48:19 +0000 Subject: [PATCH] fix: add provider retry policy --- CHANGELOG.md | 1 + docs/concepts/retry.md | 58 ++++++++++++ docs/gateway/configuration.md | 16 +++- docs/providers/discord.md | 12 ++- docs/providers/telegram.md | 4 + src/config/schema.ts | 24 +++++ src/config/types.ts | 15 +++ src/config/zod-schema.ts | 11 +++ src/discord/send.test.ts | 131 ++++++++++++++++++++++++++ src/discord/send.ts | 167 +++++++++++++++++++++++----------- src/infra/retry-policy.ts | 106 +++++++++++++++++++++ src/infra/retry.test.ts | 76 ++++++++++++++++ src/infra/retry.ts | 131 ++++++++++++++++++++++++-- src/telegram/send.test.ts | 50 ++++++++++ src/telegram/send.ts | 78 ++++++++-------- 15 files changed, 779 insertions(+), 101 deletions(-) create mode 100644 docs/concepts/retry.md create mode 100644 src/infra/retry-policy.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index e1d8ada0e..ea155db21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Auto-reply: removed `autoReply` from Discord/Slack/Telegram channel configs; use `requireMention` instead (Telegram topics now support `requireMention` overrides). ### Fixes +- Discord/Telegram: add per-request retry policy with configurable delays and docs. - Pairing: generate DM pairing codes with CSPRNG, expire pending codes after 1 hour, and avoid re-sending codes for already pending requests. - Pairing: lock + atomically write pairing stores with 0600 perms and stop logging pairing codes in provider logs. - Discord: include all inbound attachments in `MediaPaths`/`MediaUrls` (back-compat `MediaPath`/`MediaUrl` still first). diff --git a/docs/concepts/retry.md b/docs/concepts/retry.md new file mode 100644 index 000000000..ca9b32c03 --- /dev/null +++ b/docs/concepts/retry.md @@ -0,0 +1,58 @@ +--- +summary: "Retry policy for outbound provider calls" +read_when: + - Updating provider retry behavior or defaults + - Debugging provider send errors or rate limits +--- +# Retry policy + +## Goals +- Retry per HTTP request, not per multi-step flow. +- Preserve ordering by retrying only the current step. +- Avoid duplicating non-idempotent operations. + +## Defaults +- Attempts: 3 +- Max delay cap: 30000 ms +- Jitter: 0.1 (10 percent) +- Provider defaults: + - Telegram min delay: 400 ms + - Discord min delay: 500 ms + +## Behavior +### Discord +- Retries only on rate-limit errors (HTTP 429). +- Uses Discord `retry_after` when available, otherwise exponential backoff. + +### Telegram +- Retries on transient errors (429, timeout, connect/reset/closed, temporarily unavailable). +- Uses `retry_after` when available, otherwise exponential backoff. +- Markdown parse errors are not retried; they fall back to plain text. + +## Configuration +Set retry policy per provider in `~/.clawdbot/clawdbot.json`: + +```json5 +{ + telegram: { + retry: { + attempts: 3, + minDelayMs: 400, + maxDelayMs: 30000, + jitter: 0.1 + } + }, + discord: { + retry: { + attempts: 3, + minDelayMs: 500, + maxDelayMs: 30000, + jitter: 0.1 + } + } +} +``` + +## Notes +- Retries apply per request (message send, media upload, reaction, poll, sticker). +- Composite flows do not retry completed steps. diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index c2d5ed22f..84676fdfe 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -493,6 +493,12 @@ Set `telegram.enabled: false` to disable automatic startup. streamMode: "partial", // off | partial | block (draft streaming) actions: { reactions: true }, // tool action gates (false disables) mediaMaxMb: 5, + retry: { // outbound retry policy + attempts: 3, + minDelayMs: 400, + maxDelayMs: 30000, + jitter: 0.1 + }, proxy: "socks5://localhost:9050", webhookUrl: "https://example.com/telegram-webhook", webhookSecret: "secret", @@ -505,6 +511,7 @@ Draft streaming notes: - Uses Telegram `sendMessageDraft` (draft bubble, not a real message). - Requires **private chat topics** (message_thread_id in DMs; bot has topics enabled). - `/reasoning stream` streams reasoning into the draft, then sends the final answer. +Retry policy defaults and behavior are documented in [Retry policy](/concepts/retry). ### `discord` (bot transport) @@ -559,7 +566,13 @@ Configure the Discord bot by setting the bot token and optional gating: } } }, - historyLimit: 20 // include last N guild messages as context + historyLimit: 20, // include last N guild messages as context + retry: { // outbound retry policy + attempts: 3, + minDelayMs: 500, + maxDelayMs: 30000, + jitter: 0.1 + } } } ``` @@ -571,6 +584,7 @@ Reaction notification modes: - `own`: reactions on the bot's own messages (default). - `all`: all reactions on all messages. - `allowlist`: reactions from `guilds..users` on all messages (empty list disables). +Retry policy defaults and behavior are documented in [Retry policy](/concepts/retry). ### `slack` (socket mode) diff --git a/docs/providers/discord.md b/docs/providers/discord.md index b4bfaf878..4d5d652c4 100644 --- a/docs/providers/discord.md +++ b/docs/providers/discord.md @@ -5,7 +5,7 @@ read_when: --- # Discord (Bot API) -Updated: 2025-12-07 +Updated: 2026-01-07 Status: ready for DM and guild text channels via the official Discord bot gateway. @@ -122,6 +122,12 @@ Example “single server, only allow me, only allow #help”: help: { allow: true, requireMention: true } } } + }, + retry: { + attempts: 3, + minDelayMs: 500, + maxDelayMs: 30000, + jitter: 0.1 } } } @@ -154,6 +160,9 @@ Notes: - Reply context is injected when a message references another message (quoted content + ids). - Native reply threading is **off by default**; enable with `discord.replyToMode` and reply tags. +## Retry policy +Outbound Discord API calls retry on rate limits (429) using Discord `retry_after` when available, with exponential backoff and jitter. Configure via `discord.retry`. See [Retry policy](/concepts/retry). + ## Config ```json5 @@ -235,6 +244,7 @@ Ack reactions are controlled globally via `messages.ackReaction` + - `guilds..reactionNotifications`: reaction system event mode (`off`, `own`, `all`, `allowlist`). - `mediaMaxMb`: clamp inbound media saved to disk. - `historyLimit`: number of recent guild messages to include as context when replying to a mention (default 20, `0` disables). +- `retry`: retry policy for outbound Discord API calls (attempts, minDelayMs, maxDelayMs, jitter). - `actions`: per-action tool gates; omit to allow all (set `false` to disable). - `reactions` (covers react + read reactions) - `stickers`, `polls`, `permissions`, `messages`, `threads`, `pins`, `search` diff --git a/docs/providers/telegram.md b/docs/providers/telegram.md index 37fa663f7..18963f35c 100644 --- a/docs/providers/telegram.md +++ b/docs/providers/telegram.md @@ -162,6 +162,9 @@ Reasoning stream (Telegram only): - If `telegram.streamMode` is `off`, reasoning stream is disabled. More context: [Streaming + chunking](/concepts/streaming). +## Retry policy +Outbound Telegram API calls retry on transient network/429 errors with exponential backoff and jitter. Configure via `telegram.retry`. See [Retry policy](/concepts/retry). + ## Agent tool (reactions) - Tool: `telegram` with `react` action (`chatId`, `messageId`, `emoji`). - Reaction removal semantics: see [/tools/reactions](/tools/reactions). @@ -215,6 +218,7 @@ Provider options: - `telegram.textChunkLimit`: outbound chunk size (chars). - `telegram.streamMode`: `off | partial | block` (draft streaming). - `telegram.mediaMaxMb`: inbound/outbound media cap (MB). +- `telegram.retry`: retry policy for outbound Telegram API calls (attempts, minDelayMs, maxDelayMs, jitter). - `telegram.proxy`: proxy URL for Bot API calls (SOCKS/HTTP). - `telegram.webhookUrl`: enable webhook mode. - `telegram.webhookSecret`: webhook secret (optional). diff --git a/src/config/schema.ts b/src/config/schema.ts index 5cf88f528..da58d2a56 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -108,10 +108,18 @@ const FIELD_LABELS: Record = { "telegram.botToken": "Telegram Bot Token", "telegram.dmPolicy": "Telegram DM Policy", "telegram.streamMode": "Telegram Stream Mode", + "telegram.retry.attempts": "Telegram Retry Attempts", + "telegram.retry.minDelayMs": "Telegram Retry Min Delay (ms)", + "telegram.retry.maxDelayMs": "Telegram Retry Max Delay (ms)", + "telegram.retry.jitter": "Telegram Retry Jitter", "whatsapp.dmPolicy": "WhatsApp DM Policy", "signal.dmPolicy": "Signal DM Policy", "imessage.dmPolicy": "iMessage DM Policy", "discord.dm.policy": "Discord DM Policy", + "discord.retry.attempts": "Discord Retry Attempts", + "discord.retry.minDelayMs": "Discord Retry Min Delay (ms)", + "discord.retry.maxDelayMs": "Discord Retry Max Delay (ms)", + "discord.retry.jitter": "Discord Retry Jitter", "slack.dm.policy": "Slack DM Policy", "discord.token": "Discord Bot Token", "slack.botToken": "Slack Bot Token", @@ -158,6 +166,14 @@ const FIELD_HELP: Record = { 'Direct message access control ("pairing" recommended). "open" requires telegram.allowFrom=["*"].', "telegram.streamMode": "Draft streaming mode for Telegram replies (off | partial | block). Requires private topics + sendMessageDraft.", + "telegram.retry.attempts": + "Max retry attempts for outbound Telegram API calls (default: 3).", + "telegram.retry.minDelayMs": + "Minimum retry delay in ms for Telegram outbound calls.", + "telegram.retry.maxDelayMs": + "Maximum retry delay cap in ms for Telegram outbound calls.", + "telegram.retry.jitter": + "Jitter factor (0-1) applied to Telegram retry delays.", "whatsapp.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires whatsapp.allowFrom=["*"].', "signal.dmPolicy": @@ -166,6 +182,14 @@ const FIELD_HELP: Record = { 'Direct message access control ("pairing" recommended). "open" requires imessage.allowFrom=["*"].', "discord.dm.policy": 'Direct message access control ("pairing" recommended). "open" requires discord.dm.allowFrom=["*"].', + "discord.retry.attempts": + "Max retry attempts for outbound Discord API calls (default: 3).", + "discord.retry.minDelayMs": + "Minimum retry delay in ms for Discord outbound calls.", + "discord.retry.maxDelayMs": + "Maximum retry delay cap in ms for Discord outbound calls.", + "discord.retry.jitter": + "Jitter factor (0-1) applied to Discord retry delays.", "slack.dm.policy": 'Direct message access control ("pairing" recommended). "open" requires slack.dm.allowFrom=["*"].', }; diff --git a/src/config/types.ts b/src/config/types.ts index c00a635cd..a9846c4e6 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -4,6 +4,17 @@ export type ReplyToMode = "off" | "first" | "all"; export type GroupPolicy = "open" | "disabled" | "allowlist"; export type DmPolicy = "pairing" | "allowlist" | "open" | "disabled"; +export type OutboundRetryConfig = { + /** Max retry attempts for outbound requests (default: 3). */ + attempts?: number; + /** Minimum retry delay in ms (default: 300-500ms depending on provider). */ + minDelayMs?: number; + /** Maximum retry delay cap in ms (default: 30000). */ + maxDelayMs?: number; + /** Jitter factor (0-1) applied to delays (default: 0.1). */ + jitter?: number; +}; + export type SessionSendPolicyAction = "allow" | "deny"; export type SessionSendPolicyMatch = { provider?: string; @@ -294,6 +305,8 @@ export type TelegramConfig = { /** Draft streaming mode for Telegram (off|partial|block). Default: partial. */ streamMode?: "off" | "partial" | "block"; mediaMaxMb?: number; + /** Retry policy for outbound Telegram API calls. */ + retry?: OutboundRetryConfig; proxy?: string; webhookUrl?: string; webhookSecret?: string; @@ -378,6 +391,8 @@ export type DiscordConfig = { textChunkLimit?: number; mediaMaxMb?: number; historyLimit?: number; + /** Retry policy for outbound Discord API calls. */ + retry?: OutboundRetryConfig; /** Per-action tool gating (default: true for all). */ actions?: DiscordActionConfig; /** Control reply threading when reply tags are present (off|first|all). */ diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index bcf3249ff..a1d42b96e 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -89,6 +89,15 @@ const GroupPolicySchema = z.enum(["open", "disabled", "allowlist"]); const DmPolicySchema = z.enum(["pairing", "allowlist", "open", "disabled"]); +const RetryConfigSchema = z + .object({ + attempts: z.number().int().min(1).optional(), + minDelayMs: z.number().int().min(0).optional(), + maxDelayMs: z.number().int().min(0).optional(), + jitter: z.number().min(0).max(1).optional(), + }) + .optional(); + const QueueModeBySurfaceSchema = z .object({ whatsapp: QueueModeSchema.optional(), @@ -867,6 +876,7 @@ export const ClawdbotSchema = z.object({ .optional() .default("partial"), mediaMaxMb: z.number().positive().optional(), + retry: RetryConfigSchema, proxy: z.string().optional(), webhookUrl: z.string().optional(), webhookSecret: z.string().optional(), @@ -899,6 +909,7 @@ export const ClawdbotSchema = z.object({ textChunkLimit: z.number().int().positive().optional(), mediaMaxMb: z.number().positive().optional(), historyLimit: z.number().int().min(0).optional(), + retry: RetryConfigSchema, actions: z .object({ reactions: z.boolean().optional(), diff --git a/src/discord/send.test.ts b/src/discord/send.test.ts index c5b67f3e2..6f714ca2b 100644 --- a/src/discord/send.test.ts +++ b/src/discord/send.test.ts @@ -1,3 +1,4 @@ +import { RateLimitError } from "@buape/carbon"; import { PermissionFlagsBits, Routes } from "discord-api-types/v10"; import { beforeEach, describe, expect, it, vi } from "vitest"; @@ -662,3 +663,133 @@ describe("sendPollDiscord", () => { ); }); }); + +function createMockRateLimitError(retryAfter = 0.001): RateLimitError { + const response = new Response(null, { + status: 429, + headers: { + "X-RateLimit-Scope": "user", + "X-RateLimit-Bucket": "test-bucket", + }, + }); + return new RateLimitError(response, { + message: "You are being rate limited.", + retry_after: retryAfter, + global: false, + }); +} + +describe("retry rate limits", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("retries on Discord rate limits", async () => { + const { rest, postMock } = makeRest(); + const rateLimitError = createMockRateLimitError(0); + + postMock + .mockRejectedValueOnce(rateLimitError) + .mockResolvedValueOnce({ id: "msg1", channel_id: "789" }); + + const res = await sendMessageDiscord("channel:789", "hello", { + rest, + token: "t", + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + }); + + expect(res.messageId).toBe("msg1"); + expect(postMock).toHaveBeenCalledTimes(2); + }); + + it("uses retry_after delays when rate limited", async () => { + vi.useFakeTimers(); + const setTimeoutSpy = vi.spyOn(global, "setTimeout"); + const { rest, postMock } = makeRest(); + const rateLimitError = createMockRateLimitError(0.5); + + postMock + .mockRejectedValueOnce(rateLimitError) + .mockResolvedValueOnce({ id: "msg1", channel_id: "789" }); + + const promise = sendMessageDiscord("channel:789", "hello", { + rest, + token: "t", + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 1000, jitter: 0 }, + }); + + await vi.runAllTimersAsync(); + await expect(promise).resolves.toEqual({ + messageId: "msg1", + channelId: "789", + }); + expect(setTimeoutSpy.mock.calls[0]?.[1]).toBe(500); + setTimeoutSpy.mockRestore(); + vi.useRealTimers(); + }); + + it("stops after max retry attempts", async () => { + const { rest, postMock } = makeRest(); + const rateLimitError = createMockRateLimitError(0); + + postMock.mockRejectedValue(rateLimitError); + + await expect( + sendMessageDiscord("channel:789", "hello", { + rest, + token: "t", + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + }), + ).rejects.toBeInstanceOf(RateLimitError); + expect(postMock).toHaveBeenCalledTimes(2); + }); + + it("does not retry non-rate-limit errors", async () => { + const { rest, postMock } = makeRest(); + postMock.mockRejectedValueOnce(new Error("network error")); + + await expect( + sendMessageDiscord("channel:789", "hello", { rest, token: "t" }), + ).rejects.toThrow("network error"); + expect(postMock).toHaveBeenCalledTimes(1); + }); + + it("retries reactions on rate limits", async () => { + const { rest, putMock } = makeRest(); + const rateLimitError = createMockRateLimitError(0); + + putMock + .mockRejectedValueOnce(rateLimitError) + .mockResolvedValueOnce(undefined); + + const res = await reactMessageDiscord("chan1", "msg1", "ok", { + rest, + token: "t", + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + }); + + expect(res.ok).toBe(true); + expect(putMock).toHaveBeenCalledTimes(2); + }); + + it("retries media upload without duplicating overflow text", async () => { + const { rest, postMock } = makeRest(); + const rateLimitError = createMockRateLimitError(0); + const text = "a".repeat(2005); + + postMock + .mockRejectedValueOnce(rateLimitError) + .mockResolvedValueOnce({ id: "msg1", channel_id: "789" }) + .mockResolvedValueOnce({ id: "msg2", channel_id: "789" }); + + const res = await sendMessageDiscord("channel:789", text, { + rest, + token: "t", + mediaUrl: "https://example.com/photo.jpg", + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + }); + + expect(res.messageId).toBe("msg1"); + expect(postMock).toHaveBeenCalledTimes(3); + }); +}); diff --git a/src/discord/send.ts b/src/discord/send.ts index 35de62e2f..f07040dfb 100644 --- a/src/discord/send.ts +++ b/src/discord/send.ts @@ -19,6 +19,11 @@ import { import { chunkMarkdownText } from "../auto-reply/chunk.js"; import { loadConfig } from "../config/config.js"; +import type { RetryConfig } from "../infra/retry.js"; +import { + createDiscordRetryRunner, + type RetryRunner, +} from "../infra/retry-policy.js"; import { normalizePollDurationHours, normalizePollInput, @@ -35,6 +40,7 @@ const DISCORD_POLL_MAX_ANSWERS = 10; const DISCORD_POLL_MAX_DURATION_HOURS = 32 * 24; const DISCORD_MISSING_PERMISSIONS = 50013; const DISCORD_CANNOT_DM = 50007; +type DiscordRequest = RetryRunner; export class DiscordSendError extends Error { kind?: "missing-permissions" | "dm-blocked"; @@ -72,6 +78,7 @@ type DiscordSendOpts = { verbose?: boolean; rest?: RequestClient; replyTo?: string; + retry?: RetryConfig; }; export type DiscordSendResult = { @@ -82,6 +89,8 @@ export type DiscordSendResult = { export type DiscordReactOpts = { token?: string; rest?: RequestClient; + verbose?: boolean; + retry?: RetryConfig; }; export type DiscordReactionUser = { @@ -187,6 +196,24 @@ function resolveRest(token: string, rest?: RequestClient) { return rest ?? new RequestClient(token); } +type DiscordClientOpts = { + token?: string; + rest?: RequestClient; + retry?: RetryConfig; + verbose?: boolean; +}; + +function createDiscordClient(opts: DiscordClientOpts, cfg = loadConfig()) { + const token = resolveToken(opts.token); + const rest = resolveRest(token, opts.rest); + const request = createDiscordRetryRunner({ + retry: opts.retry, + configRetry: cfg.discord?.retry, + verbose: opts.verbose, + }); + return { token, rest, request }; +} + function normalizeReactionEmoji(raw: string) { const trimmed = raw.trim(); if (!trimmed) { @@ -358,13 +385,18 @@ async function buildDiscordSendError( async function resolveChannelId( rest: RequestClient, recipient: DiscordRecipient, + request: DiscordRequest, ): Promise<{ channelId: string; dm?: boolean }> { if (recipient.kind === "channel") { return { channelId: recipient.id }; } - const dmChannel = (await rest.post(Routes.userChannels(), { - body: { recipient_id: recipient.id }, - })) as { id: string }; + const dmChannel = (await request( + () => + rest.post(Routes.userChannels(), { + body: { recipient_id: recipient.id }, + }) as Promise<{ id: string }>, + "dm-channel", + )) as { id: string }; if (!dmChannel?.id) { throw new Error("Failed to create Discord DM channel"); } @@ -375,7 +407,8 @@ async function sendDiscordText( rest: RequestClient, channelId: string, text: string, - replyTo?: string, + replyTo: string | undefined, + request: DiscordRequest, ) { if (!text.trim()) { throw new Error("Message must be non-empty for Discord sends"); @@ -384,21 +417,29 @@ async function sendDiscordText( ? { message_id: replyTo, fail_if_not_exists: false } : undefined; if (text.length <= DISCORD_TEXT_LIMIT) { - const res = (await rest.post(Routes.channelMessages(channelId), { - body: { content: text, message_reference: messageReference }, - })) as { id: string; channel_id: string }; + const res = (await request( + () => + rest.post(Routes.channelMessages(channelId), { + body: { content: text, message_reference: messageReference }, + }) as Promise<{ id: string; channel_id: string }>, + "text", + )) as { id: string; channel_id: string }; return res; } const chunks = chunkMarkdownText(text, DISCORD_TEXT_LIMIT); let last: { id: string; channel_id: string } | null = null; let isFirst = true; for (const chunk of chunks) { - last = (await rest.post(Routes.channelMessages(channelId), { - body: { - content: chunk, - message_reference: isFirst ? messageReference : undefined, - }, - })) as { id: string; channel_id: string }; + last = (await request( + () => + rest.post(Routes.channelMessages(channelId), { + body: { + content: chunk, + message_reference: isFirst ? messageReference : undefined, + }, + }) as Promise<{ id: string; channel_id: string }>, + "text", + )) as { id: string; channel_id: string }; isFirst = false; } if (!last) { @@ -412,7 +453,8 @@ async function sendDiscordMedia( channelId: string, text: string, mediaUrl: string, - replyTo?: string, + replyTo: string | undefined, + request: DiscordRequest, ) { const media = await loadWebMedia(mediaUrl); const caption = @@ -420,22 +462,26 @@ async function sendDiscordMedia( const messageReference = replyTo ? { message_id: replyTo, fail_if_not_exists: false } : undefined; - const res = (await rest.post(Routes.channelMessages(channelId), { - body: { - content: caption || undefined, - message_reference: messageReference, - files: [ - { - data: media.buffer, - name: media.fileName ?? "upload", + const res = (await request( + () => + rest.post(Routes.channelMessages(channelId), { + body: { + content: caption || undefined, + message_reference: messageReference, + files: [ + { + data: media.buffer, + name: media.fileName ?? "upload", + }, + ], }, - ], - }, - })) as { id: string; channel_id: string }; + }) as Promise<{ id: string; channel_id: string }>, + "media", + )) as { id: string; channel_id: string }; if (text.length > DISCORD_TEXT_LIMIT) { const remaining = text.slice(DISCORD_TEXT_LIMIT).trim(); if (remaining) { - await sendDiscordText(rest, channelId, remaining); + await sendDiscordText(rest, channelId, remaining, undefined, request); } } return res; @@ -471,10 +517,10 @@ export async function sendMessageDiscord( text: string, opts: DiscordSendOpts = {}, ): Promise { - const token = resolveToken(opts.token); - const rest = resolveRest(token, opts.rest); + const cfg = loadConfig(); + const { token, rest, request } = createDiscordClient(opts, cfg); const recipient = parseRecipient(to); - const { channelId } = await resolveChannelId(rest, recipient); + const { channelId } = await resolveChannelId(rest, recipient, request); let result: | { id: string; channel_id: string } | { id: string | null; channel_id: string }; @@ -486,9 +532,16 @@ export async function sendMessageDiscord( text, opts.mediaUrl, opts.replyTo, + request, ); } else { - result = await sendDiscordText(rest, channelId, text, opts.replyTo); + result = await sendDiscordText( + rest, + channelId, + text, + opts.replyTo, + request, + ); } } catch (err) { throw await buildDiscordSendError(err, { @@ -510,18 +563,22 @@ export async function sendStickerDiscord( stickerIds: string[], opts: DiscordSendOpts & { content?: string } = {}, ): Promise { - const token = resolveToken(opts.token); - const rest = resolveRest(token, opts.rest); + const cfg = loadConfig(); + const { rest, request } = createDiscordClient(opts, cfg); const recipient = parseRecipient(to); - const { channelId } = await resolveChannelId(rest, recipient); + const { channelId } = await resolveChannelId(rest, recipient, request); const content = opts.content?.trim(); const stickers = normalizeStickerIds(stickerIds); - const res = (await rest.post(Routes.channelMessages(channelId), { - body: { - content: content || undefined, - sticker_ids: stickers, - }, - })) as { id: string; channel_id: string }; + const res = (await request( + () => + rest.post(Routes.channelMessages(channelId), { + body: { + content: content || undefined, + sticker_ids: stickers, + }, + }) as Promise<{ id: string; channel_id: string }>, + "sticker", + )) as { id: string; channel_id: string }; return { messageId: res.id ? String(res.id) : "unknown", channelId: String(res.channel_id ?? channelId), @@ -533,18 +590,22 @@ export async function sendPollDiscord( poll: PollInput, opts: DiscordSendOpts & { content?: string } = {}, ): Promise { - const token = resolveToken(opts.token); - const rest = resolveRest(token, opts.rest); + const cfg = loadConfig(); + const { rest, request } = createDiscordClient(opts, cfg); const recipient = parseRecipient(to); - const { channelId } = await resolveChannelId(rest, recipient); + const { channelId } = await resolveChannelId(rest, recipient, request); const content = opts.content?.trim(); const payload = normalizeDiscordPollInput(poll); - const res = (await rest.post(Routes.channelMessages(channelId), { - body: { - content: content || undefined, - poll: payload, - }, - })) as { id: string; channel_id: string }; + const res = (await request( + () => + rest.post(Routes.channelMessages(channelId), { + body: { + content: content || undefined, + poll: payload, + }, + }) as Promise<{ id: string; channel_id: string }>, + "poll", + )) as { id: string; channel_id: string }; return { messageId: res.id ? String(res.id) : "unknown", channelId: String(res.channel_id ?? channelId), @@ -557,11 +618,13 @@ export async function reactMessageDiscord( emoji: string, opts: DiscordReactOpts = {}, ) { - const token = resolveToken(opts.token); - const rest = resolveRest(token, opts.rest); + const cfg = loadConfig(); + const { rest, request } = createDiscordClient(opts, cfg); const encoded = normalizeReactionEmoji(emoji); - await rest.put( - Routes.channelMessageOwnReaction(channelId, messageId, encoded), + await request( + () => + rest.put(Routes.channelMessageOwnReaction(channelId, messageId, encoded)), + "react", ); return { ok: true }; } diff --git a/src/infra/retry-policy.ts b/src/infra/retry-policy.ts new file mode 100644 index 000000000..3f30974e6 --- /dev/null +++ b/src/infra/retry-policy.ts @@ -0,0 +1,106 @@ +import { RateLimitError } from "@buape/carbon"; + +import { formatErrorMessage } from "./errors.js"; +import { type RetryConfig, resolveRetryConfig, retryAsync } from "./retry.js"; + +export type RetryRunner = ( + fn: () => Promise, + label?: string, +) => Promise; + +export const DISCORD_RETRY_DEFAULTS = { + attempts: 3, + minDelayMs: 500, + maxDelayMs: 30_000, + jitter: 0.1, +}; + +export const TELEGRAM_RETRY_DEFAULTS = { + attempts: 3, + minDelayMs: 400, + maxDelayMs: 30_000, + jitter: 0.1, +}; + +const TELEGRAM_RETRY_RE = + /429|timeout|connect|reset|closed|unavailable|temporarily/i; + +function getTelegramRetryAfterMs(err: unknown): number | undefined { + if (!err || typeof err !== "object") return undefined; + const candidate = + "parameters" in err && err.parameters && typeof err.parameters === "object" + ? (err.parameters as { retry_after?: unknown }).retry_after + : "response" in err && + err.response && + typeof err.response === "object" && + "parameters" in err.response + ? ( + err.response as { + parameters?: { retry_after?: unknown }; + } + ).parameters?.retry_after + : "error" in err && + err.error && + typeof err.error === "object" && + "parameters" in err.error + ? (err.error as { parameters?: { retry_after?: unknown } }).parameters + ?.retry_after + : undefined; + return typeof candidate === "number" && Number.isFinite(candidate) + ? candidate * 1000 + : undefined; +} + +export function createDiscordRetryRunner(params: { + retry?: RetryConfig; + configRetry?: RetryConfig; + verbose?: boolean; +}): RetryRunner { + const retryConfig = resolveRetryConfig(DISCORD_RETRY_DEFAULTS, { + ...params.configRetry, + ...params.retry, + }); + return (fn: () => Promise, label?: string) => + retryAsync(fn, { + ...retryConfig, + label, + shouldRetry: (err) => err instanceof RateLimitError, + retryAfterMs: (err) => + err instanceof RateLimitError ? err.retryAfter * 1000 : undefined, + onRetry: params.verbose + ? (info) => { + const labelText = info.label ?? "request"; + const maxRetries = Math.max(1, info.maxAttempts - 1); + console.warn( + `discord ${labelText} rate limited, retry ${info.attempt}/${maxRetries} in ${info.delayMs}ms`, + ); + } + : undefined, + }); +} + +export function createTelegramRetryRunner(params: { + retry?: RetryConfig; + configRetry?: RetryConfig; + verbose?: boolean; +}): RetryRunner { + const retryConfig = resolveRetryConfig(TELEGRAM_RETRY_DEFAULTS, { + ...params.configRetry, + ...params.retry, + }); + return (fn: () => Promise, label?: string) => + retryAsync(fn, { + ...retryConfig, + label, + shouldRetry: (err) => TELEGRAM_RETRY_RE.test(formatErrorMessage(err)), + retryAfterMs: getTelegramRetryAfterMs, + onRetry: params.verbose + ? (info) => { + const maxRetries = Math.max(1, info.maxAttempts - 1); + console.warn( + `telegram send retry ${info.attempt}/${maxRetries} for ${info.label ?? label ?? "request"} in ${info.delayMs}ms: ${formatErrorMessage(info.err)}`, + ); + } + : undefined, + }); +} diff --git a/src/infra/retry.test.ts b/src/infra/retry.test.ts index 7099f5239..1c14364ed 100644 --- a/src/infra/retry.test.ts +++ b/src/infra/retry.test.ts @@ -25,4 +25,80 @@ describe("retryAsync", () => { await expect(retryAsync(fn, 2, 1)).rejects.toThrow("boom"); expect(fn).toHaveBeenCalledTimes(2); }); + + it("stops when shouldRetry returns false", async () => { + const fn = vi.fn().mockRejectedValue(new Error("boom")); + await expect( + retryAsync(fn, { attempts: 3, shouldRetry: () => false }), + ).rejects.toThrow("boom"); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it("calls onRetry before retrying", async () => { + const fn = vi + .fn() + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValueOnce("ok"); + const onRetry = vi.fn(); + const res = await retryAsync(fn, { + attempts: 2, + minDelayMs: 0, + maxDelayMs: 0, + onRetry, + }); + expect(res).toBe("ok"); + expect(onRetry).toHaveBeenCalledWith( + expect.objectContaining({ attempt: 1, maxAttempts: 2 }), + ); + }); + + it("clamps attempts to at least 1", async () => { + const fn = vi.fn().mockRejectedValue(new Error("boom")); + await expect( + retryAsync(fn, { attempts: 0, minDelayMs: 0, maxDelayMs: 0 }), + ).rejects.toThrow("boom"); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it("uses retryAfterMs when provided", async () => { + vi.useFakeTimers(); + const fn = vi + .fn() + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValueOnce("ok"); + const delays: number[] = []; + const promise = retryAsync(fn, { + attempts: 2, + minDelayMs: 0, + maxDelayMs: 1000, + jitter: 0, + retryAfterMs: () => 500, + onRetry: (info) => delays.push(info.delayMs), + }); + await vi.runAllTimersAsync(); + await expect(promise).resolves.toBe("ok"); + expect(delays[0]).toBe(500); + vi.useRealTimers(); + }); + + it("clamps retryAfterMs to maxDelayMs", async () => { + vi.useFakeTimers(); + const fn = vi + .fn() + .mockRejectedValueOnce(new Error("boom")) + .mockResolvedValueOnce("ok"); + const delays: number[] = []; + const promise = retryAsync(fn, { + attempts: 2, + minDelayMs: 0, + maxDelayMs: 100, + jitter: 0, + retryAfterMs: () => 500, + onRetry: (info) => delays.push(info.delayMs), + }); + await vi.runAllTimersAsync(); + await expect(promise).resolves.toBe("ok"); + expect(delays[0]).toBe(100); + vi.useRealTimers(); + }); }); diff --git a/src/infra/retry.ts b/src/infra/retry.ts index 234ab539c..0528953e7 100644 --- a/src/infra/retry.ts +++ b/src/infra/retry.ts @@ -1,18 +1,137 @@ +export type RetryConfig = { + attempts?: number; + minDelayMs?: number; + maxDelayMs?: number; + jitter?: number; +}; + +export type RetryInfo = { + attempt: number; + maxAttempts: number; + delayMs: number; + err: unknown; + label?: string; +}; + +export type RetryOptions = RetryConfig & { + label?: string; + shouldRetry?: (err: unknown, attempt: number) => boolean; + retryAfterMs?: (err: unknown) => number | undefined; + onRetry?: (info: RetryInfo) => void; +}; + +const DEFAULT_RETRY_CONFIG = { + attempts: 3, + minDelayMs: 300, + maxDelayMs: 30_000, + jitter: 0, +}; + +const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +const asFiniteNumber = (value: unknown): number | undefined => + typeof value === "number" && Number.isFinite(value) ? value : undefined; + +const clampNumber = ( + value: unknown, + fallback: number, + min?: number, + max?: number, +) => { + const next = asFiniteNumber(value); + if (next === undefined) return fallback; + const floor = typeof min === "number" ? min : Number.NEGATIVE_INFINITY; + const ceiling = typeof max === "number" ? max : Number.POSITIVE_INFINITY; + return Math.min(Math.max(next, floor), ceiling); +}; + +export function resolveRetryConfig( + defaults: Required = DEFAULT_RETRY_CONFIG, + overrides?: RetryConfig, +): Required { + const attempts = Math.max( + 1, + Math.round(clampNumber(overrides?.attempts, defaults.attempts, 1)), + ); + const minDelayMs = Math.max( + 0, + Math.round(clampNumber(overrides?.minDelayMs, defaults.minDelayMs, 0)), + ); + const maxDelayMs = Math.max( + minDelayMs, + Math.round(clampNumber(overrides?.maxDelayMs, defaults.maxDelayMs, 0)), + ); + const jitter = clampNumber(overrides?.jitter, defaults.jitter, 0, 1); + return { attempts, minDelayMs, maxDelayMs, jitter }; +} + +function applyJitter(delayMs: number, jitter: number): number { + if (jitter <= 0) return delayMs; + const offset = (Math.random() * 2 - 1) * jitter; + return Math.max(0, Math.round(delayMs * (1 + offset))); +} + export async function retryAsync( fn: () => Promise, - attempts = 3, + attemptsOrOptions: number | RetryOptions = 3, initialDelayMs = 300, ): Promise { + if (typeof attemptsOrOptions === "number") { + const attempts = Math.max(1, Math.round(attemptsOrOptions)); + let lastErr: unknown; + for (let i = 0; i < attempts; i += 1) { + try { + return await fn(); + } catch (err) { + lastErr = err; + if (i === attempts - 1) break; + const delay = initialDelayMs * 2 ** i; + await sleep(delay); + } + } + throw lastErr ?? new Error("Retry failed"); + } + + const options = attemptsOrOptions; + + const resolved = resolveRetryConfig(DEFAULT_RETRY_CONFIG, options); + const maxAttempts = resolved.attempts; + const minDelayMs = resolved.minDelayMs; + const maxDelayMs = + Number.isFinite(resolved.maxDelayMs) && resolved.maxDelayMs > 0 + ? resolved.maxDelayMs + : Number.POSITIVE_INFINITY; + const jitter = resolved.jitter; + const shouldRetry = options.shouldRetry ?? (() => true); let lastErr: unknown; - for (let i = 0; i < attempts; i += 1) { + + for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { try { return await fn(); } catch (err) { lastErr = err; - if (i === attempts - 1) break; - const delay = initialDelayMs * 2 ** i; - await new Promise((r) => setTimeout(r, delay)); + if (attempt >= maxAttempts || !shouldRetry(err, attempt)) break; + + const retryAfterMs = options.retryAfterMs?.(err); + const hasRetryAfter = + typeof retryAfterMs === "number" && Number.isFinite(retryAfterMs); + const baseDelay = hasRetryAfter + ? Math.max(retryAfterMs, minDelayMs) + : minDelayMs * 2 ** (attempt - 1); + let delay = Math.min(baseDelay, maxDelayMs); + delay = applyJitter(delay, jitter); + delay = Math.min(Math.max(delay, minDelayMs), maxDelayMs); + + options.onRetry?.({ + attempt, + maxAttempts, + delayMs: delay, + err, + label: options.label, + }); + await sleep(delay); } } - throw lastErr; + + throw lastErr ?? new Error("Retry failed"); } diff --git a/src/telegram/send.test.ts b/src/telegram/send.test.ts index d3a9aad27..7c72f8cd1 100644 --- a/src/telegram/send.test.ts +++ b/src/telegram/send.test.ts @@ -80,6 +80,56 @@ describe("sendMessageTelegram", () => { ).rejects.toThrow(/chat_id=123/); }); + it("retries on transient errors with retry_after", async () => { + vi.useFakeTimers(); + const chatId = "123"; + const err = Object.assign(new Error("429"), { + parameters: { retry_after: 0.5 }, + }); + const sendMessage = vi + .fn() + .mockRejectedValueOnce(err) + .mockResolvedValueOnce({ + message_id: 1, + chat: { id: chatId }, + }); + const api = { sendMessage } as unknown as { + sendMessage: typeof sendMessage; + }; + const setTimeoutSpy = vi.spyOn(global, "setTimeout"); + + const promise = sendMessageTelegram(chatId, "hi", { + token: "tok", + api, + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 1000, jitter: 0 }, + }); + + await vi.runAllTimersAsync(); + await expect(promise).resolves.toEqual({ messageId: "1", chatId }); + expect(setTimeoutSpy.mock.calls[0]?.[1]).toBe(500); + setTimeoutSpy.mockRestore(); + vi.useRealTimers(); + }); + + it("does not retry on non-transient errors", async () => { + const chatId = "123"; + const sendMessage = vi + .fn() + .mockRejectedValue(new Error("400: Bad Request")); + const api = { sendMessage } as unknown as { + sendMessage: typeof sendMessage; + }; + + await expect( + sendMessageTelegram(chatId, "hi", { + token: "tok", + api, + retry: { attempts: 3, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + }), + ).rejects.toThrow(/Bad Request/); + expect(sendMessage).toHaveBeenCalledTimes(1); + }); + it("sends GIF media as animation", async () => { const chatId = "123"; const sendAnimation = vi.fn().mockResolvedValue({ diff --git a/src/telegram/send.ts b/src/telegram/send.ts index 3b90e2840..9fafeb1ab 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -1,9 +1,14 @@ // @ts-nocheck import { Bot, InputFile } from "grammy"; +import { loadConfig } from "../config/config.js"; +import type { ClawdbotConfig } from "../config/types.js"; import { formatErrorMessage } from "../infra/errors.js"; +import type { RetryConfig } from "../infra/retry.js"; +import { createTelegramRetryRunner } from "../infra/retry-policy.js"; import { mediaKindFromMime } from "../media/constants.js"; import { isGifMedia } from "../media/mime.js"; import { loadWebMedia } from "../web/media.js"; +import { resolveTelegramToken } from "./token.js"; type TelegramSendOpts = { token?: string; @@ -12,6 +17,7 @@ type TelegramSendOpts = { maxBytes?: number; messageThreadId?: number; api?: Bot["api"]; + retry?: RetryConfig; }; type TelegramSendResult = { @@ -23,16 +29,19 @@ type TelegramReactionOpts = { token?: string; api?: Bot["api"]; remove?: boolean; + verbose?: boolean; + retry?: RetryConfig; }; const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i; -function resolveToken(explicit?: string): string { - const token = explicit ?? process.env.TELEGRAM_BOT_TOKEN; +function resolveToken(explicit?: string, cfg?: ClawdbotConfig): string { + if (explicit?.trim()) return explicit.trim(); + const { token } = resolveTelegramToken(cfg); if (!token) { throw new Error( - "TELEGRAM_BOT_TOKEN is required for Telegram sends (Bot API)", + "TELEGRAM_BOT_TOKEN (or telegram.botToken/tokenFile) is required for Telegram sends (Bot API)", ); } return token.trim(); @@ -84,7 +93,8 @@ export async function sendMessageTelegram( text: string, opts: TelegramSendOpts = {}, ): Promise { - const token = resolveToken(opts.token); + const cfg = loadConfig(); + const token = resolveToken(opts.token, cfg); const chatId = normalizeChatId(to); const bot = opts.api ? null : new Bot(token); const api = opts.api ?? bot?.api; @@ -93,34 +103,11 @@ export async function sendMessageTelegram( typeof opts.messageThreadId === "number" ? { message_thread_id: Math.trunc(opts.messageThreadId) } : undefined; - - const sleep = (ms: number) => - new Promise((resolve) => setTimeout(resolve, ms)); - const sendWithRetry = async (fn: () => Promise, label: string) => { - let lastErr: unknown; - for (let attempt = 1; attempt <= 3; attempt++) { - try { - return await fn(); - } catch (err) { - lastErr = err; - const errText = formatErrorMessage(err); - const terminal = - attempt === 3 || - !/429|timeout|connect|reset|closed|unavailable|temporarily/i.test( - errText, - ); - if (terminal) break; - const backoff = 400 * attempt; - if (opts.verbose) { - console.warn( - `telegram send retry ${attempt}/2 for ${label} in ${backoff}ms: ${errText}`, - ); - } - await sleep(backoff); - } - } - throw lastErr ?? new Error(`Telegram send failed (${label})`); - }; + const request = createTelegramRetryRunner({ + retry: opts.retry, + configRetry: cfg.telegram?.retry, + verbose: opts.verbose, + }); const wrapChatNotFound = (err: unknown) => { if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) @@ -154,35 +141,35 @@ export async function sendMessageTelegram( | Awaited> | Awaited>; if (isGif) { - result = await sendWithRetry( + result = await request( () => api.sendAnimation(chatId, file, { caption, ...threadParams }), "animation", ).catch((err) => { throw wrapChatNotFound(err); }); } else if (kind === "image") { - result = await sendWithRetry( + result = await request( () => api.sendPhoto(chatId, file, { caption, ...threadParams }), "photo", ).catch((err) => { throw wrapChatNotFound(err); }); } else if (kind === "video") { - result = await sendWithRetry( + result = await request( () => api.sendVideo(chatId, file, { caption, ...threadParams }), "video", ).catch((err) => { throw wrapChatNotFound(err); }); } else if (kind === "audio") { - result = await sendWithRetry( + result = await request( () => api.sendAudio(chatId, file, { caption, ...threadParams }), "audio", ).catch((err) => { throw wrapChatNotFound(err); }); } else { - result = await sendWithRetry( + result = await request( () => api.sendDocument(chatId, file, { caption, ...threadParams }), "document", ).catch((err) => { @@ -196,7 +183,7 @@ export async function sendMessageTelegram( if (!text || !text.trim()) { throw new Error("Message must be non-empty for Telegram sends"); } - const res = await sendWithRetry( + const res = await request( () => api.sendMessage(chatId, text, { parse_mode: "Markdown", @@ -213,7 +200,7 @@ export async function sendMessageTelegram( `telegram markdown parse failed, retrying as plain text: ${errText}`, ); } - return await sendWithRetry( + return await request( () => threadParams ? api.sendMessage(chatId, text, threadParams) @@ -235,11 +222,17 @@ export async function reactMessageTelegram( emoji: string, opts: TelegramReactionOpts = {}, ): Promise<{ ok: true }> { - const token = resolveToken(opts.token); + const cfg = loadConfig(); + const token = resolveToken(opts.token, cfg); const chatId = normalizeChatId(String(chatIdInput)); const messageId = normalizeMessageId(messageIdInput); const bot = opts.api ? null : new Bot(token); const api = opts.api ?? bot?.api; + const request = createTelegramRetryRunner({ + retry: opts.retry, + configRetry: cfg.telegram?.retry, + verbose: opts.verbose, + }); const remove = opts.remove === true; const trimmedEmoji = emoji.trim(); const reactions = @@ -247,7 +240,10 @@ export async function reactMessageTelegram( if (typeof api.setMessageReaction !== "function") { throw new Error("Telegram reactions are unavailable in this bot API."); } - await api.setMessageReaction(chatId, messageId, reactions); + await request( + () => api.setMessageReaction(chatId, messageId, reactions), + "reaction", + ); return { ok: true }; }