fix: add provider retry policy

This commit is contained in:
Peter Steinberger
2026-01-07 17:48:19 +00:00
parent 8db522d6a6
commit de55f4e111
15 changed files with 779 additions and 101 deletions

View File

@@ -18,6 +18,7 @@
- Auto-reply: removed `autoReply` from Discord/Slack/Telegram channel configs; use `requireMention` instead (Telegram topics now support `requireMention` overrides).
### Fixes
- Discord/Telegram: add per-request retry policy with configurable delays and docs.
- Pairing: generate DM pairing codes with CSPRNG, expire pending codes after 1 hour, and avoid re-sending codes for already pending requests.
- Pairing: lock + atomically write pairing stores with 0600 perms and stop logging pairing codes in provider logs.
- Discord: include all inbound attachments in `MediaPaths`/`MediaUrls` (back-compat `MediaPath`/`MediaUrl` still first).

58
docs/concepts/retry.md Normal file
View File

@@ -0,0 +1,58 @@
---
summary: "Retry policy for outbound provider calls"
read_when:
- Updating provider retry behavior or defaults
- Debugging provider send errors or rate limits
---
# Retry policy
## Goals
- Retry per HTTP request, not per multi-step flow.
- Preserve ordering by retrying only the current step.
- Avoid duplicating non-idempotent operations.
## Defaults
- Attempts: 3
- Max delay cap: 30000 ms
- Jitter: 0.1 (10 percent)
- Provider defaults:
- Telegram min delay: 400 ms
- Discord min delay: 500 ms
## Behavior
### Discord
- Retries only on rate-limit errors (HTTP 429).
- Uses Discord `retry_after` when available, otherwise exponential backoff.
### Telegram
- Retries on transient errors (429, timeout, connect/reset/closed, temporarily unavailable).
- Uses `retry_after` when available, otherwise exponential backoff.
- Markdown parse errors are not retried; they fall back to plain text.
## Configuration
Set retry policy per provider in `~/.clawdbot/clawdbot.json`:
```json5
{
telegram: {
retry: {
attempts: 3,
minDelayMs: 400,
maxDelayMs: 30000,
jitter: 0.1
}
},
discord: {
retry: {
attempts: 3,
minDelayMs: 500,
maxDelayMs: 30000,
jitter: 0.1
}
}
}
```
## Notes
- Retries apply per request (message send, media upload, reaction, poll, sticker).
- Composite flows do not retry completed steps.

View File

@@ -493,6 +493,12 @@ Set `telegram.enabled: false` to disable automatic startup.
streamMode: "partial", // off | partial | block (draft streaming)
actions: { reactions: true }, // tool action gates (false disables)
mediaMaxMb: 5,
retry: { // outbound retry policy
attempts: 3,
minDelayMs: 400,
maxDelayMs: 30000,
jitter: 0.1
},
proxy: "socks5://localhost:9050",
webhookUrl: "https://example.com/telegram-webhook",
webhookSecret: "secret",
@@ -505,6 +511,7 @@ Draft streaming notes:
- Uses Telegram `sendMessageDraft` (draft bubble, not a real message).
- Requires **private chat topics** (message_thread_id in DMs; bot has topics enabled).
- `/reasoning stream` streams reasoning into the draft, then sends the final answer.
Retry policy defaults and behavior are documented in [Retry policy](/concepts/retry).
### `discord` (bot transport)
@@ -559,7 +566,13 @@ Configure the Discord bot by setting the bot token and optional gating:
}
}
},
historyLimit: 20 // include last N guild messages as context
historyLimit: 20, // include last N guild messages as context
retry: { // outbound retry policy
attempts: 3,
minDelayMs: 500,
maxDelayMs: 30000,
jitter: 0.1
}
}
}
```
@@ -571,6 +584,7 @@ Reaction notification modes:
- `own`: reactions on the bot's own messages (default).
- `all`: all reactions on all messages.
- `allowlist`: reactions from `guilds.<id>.users` on all messages (empty list disables).
Retry policy defaults and behavior are documented in [Retry policy](/concepts/retry).
### `slack` (socket mode)

View File

@@ -5,7 +5,7 @@ read_when:
---
# Discord (Bot API)
Updated: 2025-12-07
Updated: 2026-01-07
Status: ready for DM and guild text channels via the official Discord bot gateway.
@@ -122,6 +122,12 @@ Example “single server, only allow me, only allow #help”:
help: { allow: true, requireMention: true }
}
}
},
retry: {
attempts: 3,
minDelayMs: 500,
maxDelayMs: 30000,
jitter: 0.1
}
}
}
@@ -154,6 +160,9 @@ Notes:
- Reply context is injected when a message references another message (quoted content + ids).
- Native reply threading is **off by default**; enable with `discord.replyToMode` and reply tags.
## Retry policy
Outbound Discord API calls retry on rate limits (429) using Discord `retry_after` when available, with exponential backoff and jitter. Configure via `discord.retry`. See [Retry policy](/concepts/retry).
## Config
```json5
@@ -235,6 +244,7 @@ Ack reactions are controlled globally via `messages.ackReaction` +
- `guilds.<id>.reactionNotifications`: reaction system event mode (`off`, `own`, `all`, `allowlist`).
- `mediaMaxMb`: clamp inbound media saved to disk.
- `historyLimit`: number of recent guild messages to include as context when replying to a mention (default 20, `0` disables).
- `retry`: retry policy for outbound Discord API calls (attempts, minDelayMs, maxDelayMs, jitter).
- `actions`: per-action tool gates; omit to allow all (set `false` to disable).
- `reactions` (covers react + read reactions)
- `stickers`, `polls`, `permissions`, `messages`, `threads`, `pins`, `search`

View File

@@ -162,6 +162,9 @@ Reasoning stream (Telegram only):
- If `telegram.streamMode` is `off`, reasoning stream is disabled.
More context: [Streaming + chunking](/concepts/streaming).
## Retry policy
Outbound Telegram API calls retry on transient network/429 errors with exponential backoff and jitter. Configure via `telegram.retry`. See [Retry policy](/concepts/retry).
## Agent tool (reactions)
- Tool: `telegram` with `react` action (`chatId`, `messageId`, `emoji`).
- Reaction removal semantics: see [/tools/reactions](/tools/reactions).
@@ -215,6 +218,7 @@ Provider options:
- `telegram.textChunkLimit`: outbound chunk size (chars).
- `telegram.streamMode`: `off | partial | block` (draft streaming).
- `telegram.mediaMaxMb`: inbound/outbound media cap (MB).
- `telegram.retry`: retry policy for outbound Telegram API calls (attempts, minDelayMs, maxDelayMs, jitter).
- `telegram.proxy`: proxy URL for Bot API calls (SOCKS/HTTP).
- `telegram.webhookUrl`: enable webhook mode.
- `telegram.webhookSecret`: webhook secret (optional).

View File

@@ -108,10 +108,18 @@ const FIELD_LABELS: Record<string, string> = {
"telegram.botToken": "Telegram Bot Token",
"telegram.dmPolicy": "Telegram DM Policy",
"telegram.streamMode": "Telegram Stream Mode",
"telegram.retry.attempts": "Telegram Retry Attempts",
"telegram.retry.minDelayMs": "Telegram Retry Min Delay (ms)",
"telegram.retry.maxDelayMs": "Telegram Retry Max Delay (ms)",
"telegram.retry.jitter": "Telegram Retry Jitter",
"whatsapp.dmPolicy": "WhatsApp DM Policy",
"signal.dmPolicy": "Signal DM Policy",
"imessage.dmPolicy": "iMessage DM Policy",
"discord.dm.policy": "Discord DM Policy",
"discord.retry.attempts": "Discord Retry Attempts",
"discord.retry.minDelayMs": "Discord Retry Min Delay (ms)",
"discord.retry.maxDelayMs": "Discord Retry Max Delay (ms)",
"discord.retry.jitter": "Discord Retry Jitter",
"slack.dm.policy": "Slack DM Policy",
"discord.token": "Discord Bot Token",
"slack.botToken": "Slack Bot Token",
@@ -158,6 +166,14 @@ const FIELD_HELP: Record<string, string> = {
'Direct message access control ("pairing" recommended). "open" requires telegram.allowFrom=["*"].',
"telegram.streamMode":
"Draft streaming mode for Telegram replies (off | partial | block). Requires private topics + sendMessageDraft.",
"telegram.retry.attempts":
"Max retry attempts for outbound Telegram API calls (default: 3).",
"telegram.retry.minDelayMs":
"Minimum retry delay in ms for Telegram outbound calls.",
"telegram.retry.maxDelayMs":
"Maximum retry delay cap in ms for Telegram outbound calls.",
"telegram.retry.jitter":
"Jitter factor (0-1) applied to Telegram retry delays.",
"whatsapp.dmPolicy":
'Direct message access control ("pairing" recommended). "open" requires whatsapp.allowFrom=["*"].',
"signal.dmPolicy":
@@ -166,6 +182,14 @@ const FIELD_HELP: Record<string, string> = {
'Direct message access control ("pairing" recommended). "open" requires imessage.allowFrom=["*"].',
"discord.dm.policy":
'Direct message access control ("pairing" recommended). "open" requires discord.dm.allowFrom=["*"].',
"discord.retry.attempts":
"Max retry attempts for outbound Discord API calls (default: 3).",
"discord.retry.minDelayMs":
"Minimum retry delay in ms for Discord outbound calls.",
"discord.retry.maxDelayMs":
"Maximum retry delay cap in ms for Discord outbound calls.",
"discord.retry.jitter":
"Jitter factor (0-1) applied to Discord retry delays.",
"slack.dm.policy":
'Direct message access control ("pairing" recommended). "open" requires slack.dm.allowFrom=["*"].',
};

View File

@@ -4,6 +4,17 @@ export type ReplyToMode = "off" | "first" | "all";
export type GroupPolicy = "open" | "disabled" | "allowlist";
export type DmPolicy = "pairing" | "allowlist" | "open" | "disabled";
export type OutboundRetryConfig = {
/** Max retry attempts for outbound requests (default: 3). */
attempts?: number;
/** Minimum retry delay in ms (default: 300-500ms depending on provider). */
minDelayMs?: number;
/** Maximum retry delay cap in ms (default: 30000). */
maxDelayMs?: number;
/** Jitter factor (0-1) applied to delays (default: 0.1). */
jitter?: number;
};
export type SessionSendPolicyAction = "allow" | "deny";
export type SessionSendPolicyMatch = {
provider?: string;
@@ -294,6 +305,8 @@ export type TelegramConfig = {
/** Draft streaming mode for Telegram (off|partial|block). Default: partial. */
streamMode?: "off" | "partial" | "block";
mediaMaxMb?: number;
/** Retry policy for outbound Telegram API calls. */
retry?: OutboundRetryConfig;
proxy?: string;
webhookUrl?: string;
webhookSecret?: string;
@@ -378,6 +391,8 @@ export type DiscordConfig = {
textChunkLimit?: number;
mediaMaxMb?: number;
historyLimit?: number;
/** Retry policy for outbound Discord API calls. */
retry?: OutboundRetryConfig;
/** Per-action tool gating (default: true for all). */
actions?: DiscordActionConfig;
/** Control reply threading when reply tags are present (off|first|all). */

View File

@@ -89,6 +89,15 @@ const GroupPolicySchema = z.enum(["open", "disabled", "allowlist"]);
const DmPolicySchema = z.enum(["pairing", "allowlist", "open", "disabled"]);
const RetryConfigSchema = z
.object({
attempts: z.number().int().min(1).optional(),
minDelayMs: z.number().int().min(0).optional(),
maxDelayMs: z.number().int().min(0).optional(),
jitter: z.number().min(0).max(1).optional(),
})
.optional();
const QueueModeBySurfaceSchema = z
.object({
whatsapp: QueueModeSchema.optional(),
@@ -867,6 +876,7 @@ export const ClawdbotSchema = z.object({
.optional()
.default("partial"),
mediaMaxMb: z.number().positive().optional(),
retry: RetryConfigSchema,
proxy: z.string().optional(),
webhookUrl: z.string().optional(),
webhookSecret: z.string().optional(),
@@ -899,6 +909,7 @@ export const ClawdbotSchema = z.object({
textChunkLimit: z.number().int().positive().optional(),
mediaMaxMb: z.number().positive().optional(),
historyLimit: z.number().int().min(0).optional(),
retry: RetryConfigSchema,
actions: z
.object({
reactions: z.boolean().optional(),

View File

@@ -1,3 +1,4 @@
import { RateLimitError } from "@buape/carbon";
import { PermissionFlagsBits, Routes } from "discord-api-types/v10";
import { beforeEach, describe, expect, it, vi } from "vitest";
@@ -662,3 +663,133 @@ describe("sendPollDiscord", () => {
);
});
});
function createMockRateLimitError(retryAfter = 0.001): RateLimitError {
const response = new Response(null, {
status: 429,
headers: {
"X-RateLimit-Scope": "user",
"X-RateLimit-Bucket": "test-bucket",
},
});
return new RateLimitError(response, {
message: "You are being rate limited.",
retry_after: retryAfter,
global: false,
});
}
describe("retry rate limits", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("retries on Discord rate limits", async () => {
const { rest, postMock } = makeRest();
const rateLimitError = createMockRateLimitError(0);
postMock
.mockRejectedValueOnce(rateLimitError)
.mockResolvedValueOnce({ id: "msg1", channel_id: "789" });
const res = await sendMessageDiscord("channel:789", "hello", {
rest,
token: "t",
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
});
expect(res.messageId).toBe("msg1");
expect(postMock).toHaveBeenCalledTimes(2);
});
it("uses retry_after delays when rate limited", async () => {
vi.useFakeTimers();
const setTimeoutSpy = vi.spyOn(global, "setTimeout");
const { rest, postMock } = makeRest();
const rateLimitError = createMockRateLimitError(0.5);
postMock
.mockRejectedValueOnce(rateLimitError)
.mockResolvedValueOnce({ id: "msg1", channel_id: "789" });
const promise = sendMessageDiscord("channel:789", "hello", {
rest,
token: "t",
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 1000, jitter: 0 },
});
await vi.runAllTimersAsync();
await expect(promise).resolves.toEqual({
messageId: "msg1",
channelId: "789",
});
expect(setTimeoutSpy.mock.calls[0]?.[1]).toBe(500);
setTimeoutSpy.mockRestore();
vi.useRealTimers();
});
it("stops after max retry attempts", async () => {
const { rest, postMock } = makeRest();
const rateLimitError = createMockRateLimitError(0);
postMock.mockRejectedValue(rateLimitError);
await expect(
sendMessageDiscord("channel:789", "hello", {
rest,
token: "t",
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
}),
).rejects.toBeInstanceOf(RateLimitError);
expect(postMock).toHaveBeenCalledTimes(2);
});
it("does not retry non-rate-limit errors", async () => {
const { rest, postMock } = makeRest();
postMock.mockRejectedValueOnce(new Error("network error"));
await expect(
sendMessageDiscord("channel:789", "hello", { rest, token: "t" }),
).rejects.toThrow("network error");
expect(postMock).toHaveBeenCalledTimes(1);
});
it("retries reactions on rate limits", async () => {
const { rest, putMock } = makeRest();
const rateLimitError = createMockRateLimitError(0);
putMock
.mockRejectedValueOnce(rateLimitError)
.mockResolvedValueOnce(undefined);
const res = await reactMessageDiscord("chan1", "msg1", "ok", {
rest,
token: "t",
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
});
expect(res.ok).toBe(true);
expect(putMock).toHaveBeenCalledTimes(2);
});
it("retries media upload without duplicating overflow text", async () => {
const { rest, postMock } = makeRest();
const rateLimitError = createMockRateLimitError(0);
const text = "a".repeat(2005);
postMock
.mockRejectedValueOnce(rateLimitError)
.mockResolvedValueOnce({ id: "msg1", channel_id: "789" })
.mockResolvedValueOnce({ id: "msg2", channel_id: "789" });
const res = await sendMessageDiscord("channel:789", text, {
rest,
token: "t",
mediaUrl: "https://example.com/photo.jpg",
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
});
expect(res.messageId).toBe("msg1");
expect(postMock).toHaveBeenCalledTimes(3);
});
});

View File

@@ -19,6 +19,11 @@ import {
import { chunkMarkdownText } from "../auto-reply/chunk.js";
import { loadConfig } from "../config/config.js";
import type { RetryConfig } from "../infra/retry.js";
import {
createDiscordRetryRunner,
type RetryRunner,
} from "../infra/retry-policy.js";
import {
normalizePollDurationHours,
normalizePollInput,
@@ -35,6 +40,7 @@ const DISCORD_POLL_MAX_ANSWERS = 10;
const DISCORD_POLL_MAX_DURATION_HOURS = 32 * 24;
const DISCORD_MISSING_PERMISSIONS = 50013;
const DISCORD_CANNOT_DM = 50007;
type DiscordRequest = RetryRunner;
export class DiscordSendError extends Error {
kind?: "missing-permissions" | "dm-blocked";
@@ -72,6 +78,7 @@ type DiscordSendOpts = {
verbose?: boolean;
rest?: RequestClient;
replyTo?: string;
retry?: RetryConfig;
};
export type DiscordSendResult = {
@@ -82,6 +89,8 @@ export type DiscordSendResult = {
export type DiscordReactOpts = {
token?: string;
rest?: RequestClient;
verbose?: boolean;
retry?: RetryConfig;
};
export type DiscordReactionUser = {
@@ -187,6 +196,24 @@ function resolveRest(token: string, rest?: RequestClient) {
return rest ?? new RequestClient(token);
}
type DiscordClientOpts = {
token?: string;
rest?: RequestClient;
retry?: RetryConfig;
verbose?: boolean;
};
function createDiscordClient(opts: DiscordClientOpts, cfg = loadConfig()) {
const token = resolveToken(opts.token);
const rest = resolveRest(token, opts.rest);
const request = createDiscordRetryRunner({
retry: opts.retry,
configRetry: cfg.discord?.retry,
verbose: opts.verbose,
});
return { token, rest, request };
}
function normalizeReactionEmoji(raw: string) {
const trimmed = raw.trim();
if (!trimmed) {
@@ -358,13 +385,18 @@ async function buildDiscordSendError(
async function resolveChannelId(
rest: RequestClient,
recipient: DiscordRecipient,
request: DiscordRequest,
): Promise<{ channelId: string; dm?: boolean }> {
if (recipient.kind === "channel") {
return { channelId: recipient.id };
}
const dmChannel = (await rest.post(Routes.userChannels(), {
body: { recipient_id: recipient.id },
})) as { id: string };
const dmChannel = (await request(
() =>
rest.post(Routes.userChannels(), {
body: { recipient_id: recipient.id },
}) as Promise<{ id: string }>,
"dm-channel",
)) as { id: string };
if (!dmChannel?.id) {
throw new Error("Failed to create Discord DM channel");
}
@@ -375,7 +407,8 @@ async function sendDiscordText(
rest: RequestClient,
channelId: string,
text: string,
replyTo?: string,
replyTo: string | undefined,
request: DiscordRequest,
) {
if (!text.trim()) {
throw new Error("Message must be non-empty for Discord sends");
@@ -384,21 +417,29 @@ async function sendDiscordText(
? { message_id: replyTo, fail_if_not_exists: false }
: undefined;
if (text.length <= DISCORD_TEXT_LIMIT) {
const res = (await rest.post(Routes.channelMessages(channelId), {
body: { content: text, message_reference: messageReference },
})) as { id: string; channel_id: string };
const res = (await request(
() =>
rest.post(Routes.channelMessages(channelId), {
body: { content: text, message_reference: messageReference },
}) as Promise<{ id: string; channel_id: string }>,
"text",
)) as { id: string; channel_id: string };
return res;
}
const chunks = chunkMarkdownText(text, DISCORD_TEXT_LIMIT);
let last: { id: string; channel_id: string } | null = null;
let isFirst = true;
for (const chunk of chunks) {
last = (await rest.post(Routes.channelMessages(channelId), {
body: {
content: chunk,
message_reference: isFirst ? messageReference : undefined,
},
})) as { id: string; channel_id: string };
last = (await request(
() =>
rest.post(Routes.channelMessages(channelId), {
body: {
content: chunk,
message_reference: isFirst ? messageReference : undefined,
},
}) as Promise<{ id: string; channel_id: string }>,
"text",
)) as { id: string; channel_id: string };
isFirst = false;
}
if (!last) {
@@ -412,7 +453,8 @@ async function sendDiscordMedia(
channelId: string,
text: string,
mediaUrl: string,
replyTo?: string,
replyTo: string | undefined,
request: DiscordRequest,
) {
const media = await loadWebMedia(mediaUrl);
const caption =
@@ -420,22 +462,26 @@ async function sendDiscordMedia(
const messageReference = replyTo
? { message_id: replyTo, fail_if_not_exists: false }
: undefined;
const res = (await rest.post(Routes.channelMessages(channelId), {
body: {
content: caption || undefined,
message_reference: messageReference,
files: [
{
data: media.buffer,
name: media.fileName ?? "upload",
const res = (await request(
() =>
rest.post(Routes.channelMessages(channelId), {
body: {
content: caption || undefined,
message_reference: messageReference,
files: [
{
data: media.buffer,
name: media.fileName ?? "upload",
},
],
},
],
},
})) as { id: string; channel_id: string };
}) as Promise<{ id: string; channel_id: string }>,
"media",
)) as { id: string; channel_id: string };
if (text.length > DISCORD_TEXT_LIMIT) {
const remaining = text.slice(DISCORD_TEXT_LIMIT).trim();
if (remaining) {
await sendDiscordText(rest, channelId, remaining);
await sendDiscordText(rest, channelId, remaining, undefined, request);
}
}
return res;
@@ -471,10 +517,10 @@ export async function sendMessageDiscord(
text: string,
opts: DiscordSendOpts = {},
): Promise<DiscordSendResult> {
const token = resolveToken(opts.token);
const rest = resolveRest(token, opts.rest);
const cfg = loadConfig();
const { token, rest, request } = createDiscordClient(opts, cfg);
const recipient = parseRecipient(to);
const { channelId } = await resolveChannelId(rest, recipient);
const { channelId } = await resolveChannelId(rest, recipient, request);
let result:
| { id: string; channel_id: string }
| { id: string | null; channel_id: string };
@@ -486,9 +532,16 @@ export async function sendMessageDiscord(
text,
opts.mediaUrl,
opts.replyTo,
request,
);
} else {
result = await sendDiscordText(rest, channelId, text, opts.replyTo);
result = await sendDiscordText(
rest,
channelId,
text,
opts.replyTo,
request,
);
}
} catch (err) {
throw await buildDiscordSendError(err, {
@@ -510,18 +563,22 @@ export async function sendStickerDiscord(
stickerIds: string[],
opts: DiscordSendOpts & { content?: string } = {},
): Promise<DiscordSendResult> {
const token = resolveToken(opts.token);
const rest = resolveRest(token, opts.rest);
const cfg = loadConfig();
const { rest, request } = createDiscordClient(opts, cfg);
const recipient = parseRecipient(to);
const { channelId } = await resolveChannelId(rest, recipient);
const { channelId } = await resolveChannelId(rest, recipient, request);
const content = opts.content?.trim();
const stickers = normalizeStickerIds(stickerIds);
const res = (await rest.post(Routes.channelMessages(channelId), {
body: {
content: content || undefined,
sticker_ids: stickers,
},
})) as { id: string; channel_id: string };
const res = (await request(
() =>
rest.post(Routes.channelMessages(channelId), {
body: {
content: content || undefined,
sticker_ids: stickers,
},
}) as Promise<{ id: string; channel_id: string }>,
"sticker",
)) as { id: string; channel_id: string };
return {
messageId: res.id ? String(res.id) : "unknown",
channelId: String(res.channel_id ?? channelId),
@@ -533,18 +590,22 @@ export async function sendPollDiscord(
poll: PollInput,
opts: DiscordSendOpts & { content?: string } = {},
): Promise<DiscordSendResult> {
const token = resolveToken(opts.token);
const rest = resolveRest(token, opts.rest);
const cfg = loadConfig();
const { rest, request } = createDiscordClient(opts, cfg);
const recipient = parseRecipient(to);
const { channelId } = await resolveChannelId(rest, recipient);
const { channelId } = await resolveChannelId(rest, recipient, request);
const content = opts.content?.trim();
const payload = normalizeDiscordPollInput(poll);
const res = (await rest.post(Routes.channelMessages(channelId), {
body: {
content: content || undefined,
poll: payload,
},
})) as { id: string; channel_id: string };
const res = (await request(
() =>
rest.post(Routes.channelMessages(channelId), {
body: {
content: content || undefined,
poll: payload,
},
}) as Promise<{ id: string; channel_id: string }>,
"poll",
)) as { id: string; channel_id: string };
return {
messageId: res.id ? String(res.id) : "unknown",
channelId: String(res.channel_id ?? channelId),
@@ -557,11 +618,13 @@ export async function reactMessageDiscord(
emoji: string,
opts: DiscordReactOpts = {},
) {
const token = resolveToken(opts.token);
const rest = resolveRest(token, opts.rest);
const cfg = loadConfig();
const { rest, request } = createDiscordClient(opts, cfg);
const encoded = normalizeReactionEmoji(emoji);
await rest.put(
Routes.channelMessageOwnReaction(channelId, messageId, encoded),
await request(
() =>
rest.put(Routes.channelMessageOwnReaction(channelId, messageId, encoded)),
"react",
);
return { ok: true };
}

106
src/infra/retry-policy.ts Normal file
View File

@@ -0,0 +1,106 @@
import { RateLimitError } from "@buape/carbon";
import { formatErrorMessage } from "./errors.js";
import { type RetryConfig, resolveRetryConfig, retryAsync } from "./retry.js";
export type RetryRunner = <T>(
fn: () => Promise<T>,
label?: string,
) => Promise<T>;
export const DISCORD_RETRY_DEFAULTS = {
attempts: 3,
minDelayMs: 500,
maxDelayMs: 30_000,
jitter: 0.1,
};
export const TELEGRAM_RETRY_DEFAULTS = {
attempts: 3,
minDelayMs: 400,
maxDelayMs: 30_000,
jitter: 0.1,
};
const TELEGRAM_RETRY_RE =
/429|timeout|connect|reset|closed|unavailable|temporarily/i;
function getTelegramRetryAfterMs(err: unknown): number | undefined {
if (!err || typeof err !== "object") return undefined;
const candidate =
"parameters" in err && err.parameters && typeof err.parameters === "object"
? (err.parameters as { retry_after?: unknown }).retry_after
: "response" in err &&
err.response &&
typeof err.response === "object" &&
"parameters" in err.response
? (
err.response as {
parameters?: { retry_after?: unknown };
}
).parameters?.retry_after
: "error" in err &&
err.error &&
typeof err.error === "object" &&
"parameters" in err.error
? (err.error as { parameters?: { retry_after?: unknown } }).parameters
?.retry_after
: undefined;
return typeof candidate === "number" && Number.isFinite(candidate)
? candidate * 1000
: undefined;
}
export function createDiscordRetryRunner(params: {
retry?: RetryConfig;
configRetry?: RetryConfig;
verbose?: boolean;
}): RetryRunner {
const retryConfig = resolveRetryConfig(DISCORD_RETRY_DEFAULTS, {
...params.configRetry,
...params.retry,
});
return <T>(fn: () => Promise<T>, label?: string) =>
retryAsync(fn, {
...retryConfig,
label,
shouldRetry: (err) => err instanceof RateLimitError,
retryAfterMs: (err) =>
err instanceof RateLimitError ? err.retryAfter * 1000 : undefined,
onRetry: params.verbose
? (info) => {
const labelText = info.label ?? "request";
const maxRetries = Math.max(1, info.maxAttempts - 1);
console.warn(
`discord ${labelText} rate limited, retry ${info.attempt}/${maxRetries} in ${info.delayMs}ms`,
);
}
: undefined,
});
}
export function createTelegramRetryRunner(params: {
retry?: RetryConfig;
configRetry?: RetryConfig;
verbose?: boolean;
}): RetryRunner {
const retryConfig = resolveRetryConfig(TELEGRAM_RETRY_DEFAULTS, {
...params.configRetry,
...params.retry,
});
return <T>(fn: () => Promise<T>, label?: string) =>
retryAsync(fn, {
...retryConfig,
label,
shouldRetry: (err) => TELEGRAM_RETRY_RE.test(formatErrorMessage(err)),
retryAfterMs: getTelegramRetryAfterMs,
onRetry: params.verbose
? (info) => {
const maxRetries = Math.max(1, info.maxAttempts - 1);
console.warn(
`telegram send retry ${info.attempt}/${maxRetries} for ${info.label ?? label ?? "request"} in ${info.delayMs}ms: ${formatErrorMessage(info.err)}`,
);
}
: undefined,
});
}

View File

@@ -25,4 +25,80 @@ describe("retryAsync", () => {
await expect(retryAsync(fn, 2, 1)).rejects.toThrow("boom");
expect(fn).toHaveBeenCalledTimes(2);
});
it("stops when shouldRetry returns false", async () => {
const fn = vi.fn().mockRejectedValue(new Error("boom"));
await expect(
retryAsync(fn, { attempts: 3, shouldRetry: () => false }),
).rejects.toThrow("boom");
expect(fn).toHaveBeenCalledTimes(1);
});
it("calls onRetry before retrying", async () => {
const fn = vi
.fn()
.mockRejectedValueOnce(new Error("boom"))
.mockResolvedValueOnce("ok");
const onRetry = vi.fn();
const res = await retryAsync(fn, {
attempts: 2,
minDelayMs: 0,
maxDelayMs: 0,
onRetry,
});
expect(res).toBe("ok");
expect(onRetry).toHaveBeenCalledWith(
expect.objectContaining({ attempt: 1, maxAttempts: 2 }),
);
});
it("clamps attempts to at least 1", async () => {
const fn = vi.fn().mockRejectedValue(new Error("boom"));
await expect(
retryAsync(fn, { attempts: 0, minDelayMs: 0, maxDelayMs: 0 }),
).rejects.toThrow("boom");
expect(fn).toHaveBeenCalledTimes(1);
});
it("uses retryAfterMs when provided", async () => {
vi.useFakeTimers();
const fn = vi
.fn()
.mockRejectedValueOnce(new Error("boom"))
.mockResolvedValueOnce("ok");
const delays: number[] = [];
const promise = retryAsync(fn, {
attempts: 2,
minDelayMs: 0,
maxDelayMs: 1000,
jitter: 0,
retryAfterMs: () => 500,
onRetry: (info) => delays.push(info.delayMs),
});
await vi.runAllTimersAsync();
await expect(promise).resolves.toBe("ok");
expect(delays[0]).toBe(500);
vi.useRealTimers();
});
it("clamps retryAfterMs to maxDelayMs", async () => {
vi.useFakeTimers();
const fn = vi
.fn()
.mockRejectedValueOnce(new Error("boom"))
.mockResolvedValueOnce("ok");
const delays: number[] = [];
const promise = retryAsync(fn, {
attempts: 2,
minDelayMs: 0,
maxDelayMs: 100,
jitter: 0,
retryAfterMs: () => 500,
onRetry: (info) => delays.push(info.delayMs),
});
await vi.runAllTimersAsync();
await expect(promise).resolves.toBe("ok");
expect(delays[0]).toBe(100);
vi.useRealTimers();
});
});

View File

@@ -1,18 +1,137 @@
export type RetryConfig = {
attempts?: number;
minDelayMs?: number;
maxDelayMs?: number;
jitter?: number;
};
export type RetryInfo = {
attempt: number;
maxAttempts: number;
delayMs: number;
err: unknown;
label?: string;
};
export type RetryOptions = RetryConfig & {
label?: string;
shouldRetry?: (err: unknown, attempt: number) => boolean;
retryAfterMs?: (err: unknown) => number | undefined;
onRetry?: (info: RetryInfo) => void;
};
const DEFAULT_RETRY_CONFIG = {
attempts: 3,
minDelayMs: 300,
maxDelayMs: 30_000,
jitter: 0,
};
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
const asFiniteNumber = (value: unknown): number | undefined =>
typeof value === "number" && Number.isFinite(value) ? value : undefined;
const clampNumber = (
value: unknown,
fallback: number,
min?: number,
max?: number,
) => {
const next = asFiniteNumber(value);
if (next === undefined) return fallback;
const floor = typeof min === "number" ? min : Number.NEGATIVE_INFINITY;
const ceiling = typeof max === "number" ? max : Number.POSITIVE_INFINITY;
return Math.min(Math.max(next, floor), ceiling);
};
export function resolveRetryConfig(
defaults: Required<RetryConfig> = DEFAULT_RETRY_CONFIG,
overrides?: RetryConfig,
): Required<RetryConfig> {
const attempts = Math.max(
1,
Math.round(clampNumber(overrides?.attempts, defaults.attempts, 1)),
);
const minDelayMs = Math.max(
0,
Math.round(clampNumber(overrides?.minDelayMs, defaults.minDelayMs, 0)),
);
const maxDelayMs = Math.max(
minDelayMs,
Math.round(clampNumber(overrides?.maxDelayMs, defaults.maxDelayMs, 0)),
);
const jitter = clampNumber(overrides?.jitter, defaults.jitter, 0, 1);
return { attempts, minDelayMs, maxDelayMs, jitter };
}
function applyJitter(delayMs: number, jitter: number): number {
if (jitter <= 0) return delayMs;
const offset = (Math.random() * 2 - 1) * jitter;
return Math.max(0, Math.round(delayMs * (1 + offset)));
}
export async function retryAsync<T>(
fn: () => Promise<T>,
attempts = 3,
attemptsOrOptions: number | RetryOptions = 3,
initialDelayMs = 300,
): Promise<T> {
if (typeof attemptsOrOptions === "number") {
const attempts = Math.max(1, Math.round(attemptsOrOptions));
let lastErr: unknown;
for (let i = 0; i < attempts; i += 1) {
try {
return await fn();
} catch (err) {
lastErr = err;
if (i === attempts - 1) break;
const delay = initialDelayMs * 2 ** i;
await sleep(delay);
}
}
throw lastErr ?? new Error("Retry failed");
}
const options = attemptsOrOptions;
const resolved = resolveRetryConfig(DEFAULT_RETRY_CONFIG, options);
const maxAttempts = resolved.attempts;
const minDelayMs = resolved.minDelayMs;
const maxDelayMs =
Number.isFinite(resolved.maxDelayMs) && resolved.maxDelayMs > 0
? resolved.maxDelayMs
: Number.POSITIVE_INFINITY;
const jitter = resolved.jitter;
const shouldRetry = options.shouldRetry ?? (() => true);
let lastErr: unknown;
for (let i = 0; i < attempts; i += 1) {
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
try {
return await fn();
} catch (err) {
lastErr = err;
if (i === attempts - 1) break;
const delay = initialDelayMs * 2 ** i;
await new Promise((r) => setTimeout(r, delay));
if (attempt >= maxAttempts || !shouldRetry(err, attempt)) break;
const retryAfterMs = options.retryAfterMs?.(err);
const hasRetryAfter =
typeof retryAfterMs === "number" && Number.isFinite(retryAfterMs);
const baseDelay = hasRetryAfter
? Math.max(retryAfterMs, minDelayMs)
: minDelayMs * 2 ** (attempt - 1);
let delay = Math.min(baseDelay, maxDelayMs);
delay = applyJitter(delay, jitter);
delay = Math.min(Math.max(delay, minDelayMs), maxDelayMs);
options.onRetry?.({
attempt,
maxAttempts,
delayMs: delay,
err,
label: options.label,
});
await sleep(delay);
}
}
throw lastErr;
throw lastErr ?? new Error("Retry failed");
}

View File

@@ -80,6 +80,56 @@ describe("sendMessageTelegram", () => {
).rejects.toThrow(/chat_id=123/);
});
it("retries on transient errors with retry_after", async () => {
vi.useFakeTimers();
const chatId = "123";
const err = Object.assign(new Error("429"), {
parameters: { retry_after: 0.5 },
});
const sendMessage = vi
.fn()
.mockRejectedValueOnce(err)
.mockResolvedValueOnce({
message_id: 1,
chat: { id: chatId },
});
const api = { sendMessage } as unknown as {
sendMessage: typeof sendMessage;
};
const setTimeoutSpy = vi.spyOn(global, "setTimeout");
const promise = sendMessageTelegram(chatId, "hi", {
token: "tok",
api,
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 1000, jitter: 0 },
});
await vi.runAllTimersAsync();
await expect(promise).resolves.toEqual({ messageId: "1", chatId });
expect(setTimeoutSpy.mock.calls[0]?.[1]).toBe(500);
setTimeoutSpy.mockRestore();
vi.useRealTimers();
});
it("does not retry on non-transient errors", async () => {
const chatId = "123";
const sendMessage = vi
.fn()
.mockRejectedValue(new Error("400: Bad Request"));
const api = { sendMessage } as unknown as {
sendMessage: typeof sendMessage;
};
await expect(
sendMessageTelegram(chatId, "hi", {
token: "tok",
api,
retry: { attempts: 3, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
}),
).rejects.toThrow(/Bad Request/);
expect(sendMessage).toHaveBeenCalledTimes(1);
});
it("sends GIF media as animation", async () => {
const chatId = "123";
const sendAnimation = vi.fn().mockResolvedValue({

View File

@@ -1,9 +1,14 @@
// @ts-nocheck
import { Bot, InputFile } from "grammy";
import { loadConfig } from "../config/config.js";
import type { ClawdbotConfig } from "../config/types.js";
import { formatErrorMessage } from "../infra/errors.js";
import type { RetryConfig } from "../infra/retry.js";
import { createTelegramRetryRunner } from "../infra/retry-policy.js";
import { mediaKindFromMime } from "../media/constants.js";
import { isGifMedia } from "../media/mime.js";
import { loadWebMedia } from "../web/media.js";
import { resolveTelegramToken } from "./token.js";
type TelegramSendOpts = {
token?: string;
@@ -12,6 +17,7 @@ type TelegramSendOpts = {
maxBytes?: number;
messageThreadId?: number;
api?: Bot["api"];
retry?: RetryConfig;
};
type TelegramSendResult = {
@@ -23,16 +29,19 @@ type TelegramReactionOpts = {
token?: string;
api?: Bot["api"];
remove?: boolean;
verbose?: boolean;
retry?: RetryConfig;
};
const PARSE_ERR_RE =
/can't parse entities|parse entities|find end of the entity/i;
function resolveToken(explicit?: string): string {
const token = explicit ?? process.env.TELEGRAM_BOT_TOKEN;
function resolveToken(explicit?: string, cfg?: ClawdbotConfig): string {
if (explicit?.trim()) return explicit.trim();
const { token } = resolveTelegramToken(cfg);
if (!token) {
throw new Error(
"TELEGRAM_BOT_TOKEN is required for Telegram sends (Bot API)",
"TELEGRAM_BOT_TOKEN (or telegram.botToken/tokenFile) is required for Telegram sends (Bot API)",
);
}
return token.trim();
@@ -84,7 +93,8 @@ export async function sendMessageTelegram(
text: string,
opts: TelegramSendOpts = {},
): Promise<TelegramSendResult> {
const token = resolveToken(opts.token);
const cfg = loadConfig();
const token = resolveToken(opts.token, cfg);
const chatId = normalizeChatId(to);
const bot = opts.api ? null : new Bot(token);
const api = opts.api ?? bot?.api;
@@ -93,34 +103,11 @@ export async function sendMessageTelegram(
typeof opts.messageThreadId === "number"
? { message_thread_id: Math.trunc(opts.messageThreadId) }
: undefined;
const sleep = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms));
const sendWithRetry = async <T>(fn: () => Promise<T>, label: string) => {
let lastErr: unknown;
for (let attempt = 1; attempt <= 3; attempt++) {
try {
return await fn();
} catch (err) {
lastErr = err;
const errText = formatErrorMessage(err);
const terminal =
attempt === 3 ||
!/429|timeout|connect|reset|closed|unavailable|temporarily/i.test(
errText,
);
if (terminal) break;
const backoff = 400 * attempt;
if (opts.verbose) {
console.warn(
`telegram send retry ${attempt}/2 for ${label} in ${backoff}ms: ${errText}`,
);
}
await sleep(backoff);
}
}
throw lastErr ?? new Error(`Telegram send failed (${label})`);
};
const request = createTelegramRetryRunner({
retry: opts.retry,
configRetry: cfg.telegram?.retry,
verbose: opts.verbose,
});
const wrapChatNotFound = (err: unknown) => {
if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err)))
@@ -154,35 +141,35 @@ export async function sendMessageTelegram(
| Awaited<ReturnType<typeof api.sendAnimation>>
| Awaited<ReturnType<typeof api.sendDocument>>;
if (isGif) {
result = await sendWithRetry(
result = await request(
() => api.sendAnimation(chatId, file, { caption, ...threadParams }),
"animation",
).catch((err) => {
throw wrapChatNotFound(err);
});
} else if (kind === "image") {
result = await sendWithRetry(
result = await request(
() => api.sendPhoto(chatId, file, { caption, ...threadParams }),
"photo",
).catch((err) => {
throw wrapChatNotFound(err);
});
} else if (kind === "video") {
result = await sendWithRetry(
result = await request(
() => api.sendVideo(chatId, file, { caption, ...threadParams }),
"video",
).catch((err) => {
throw wrapChatNotFound(err);
});
} else if (kind === "audio") {
result = await sendWithRetry(
result = await request(
() => api.sendAudio(chatId, file, { caption, ...threadParams }),
"audio",
).catch((err) => {
throw wrapChatNotFound(err);
});
} else {
result = await sendWithRetry(
result = await request(
() => api.sendDocument(chatId, file, { caption, ...threadParams }),
"document",
).catch((err) => {
@@ -196,7 +183,7 @@ export async function sendMessageTelegram(
if (!text || !text.trim()) {
throw new Error("Message must be non-empty for Telegram sends");
}
const res = await sendWithRetry(
const res = await request(
() =>
api.sendMessage(chatId, text, {
parse_mode: "Markdown",
@@ -213,7 +200,7 @@ export async function sendMessageTelegram(
`telegram markdown parse failed, retrying as plain text: ${errText}`,
);
}
return await sendWithRetry(
return await request(
() =>
threadParams
? api.sendMessage(chatId, text, threadParams)
@@ -235,11 +222,17 @@ export async function reactMessageTelegram(
emoji: string,
opts: TelegramReactionOpts = {},
): Promise<{ ok: true }> {
const token = resolveToken(opts.token);
const cfg = loadConfig();
const token = resolveToken(opts.token, cfg);
const chatId = normalizeChatId(String(chatIdInput));
const messageId = normalizeMessageId(messageIdInput);
const bot = opts.api ? null : new Bot(token);
const api = opts.api ?? bot?.api;
const request = createTelegramRetryRunner({
retry: opts.retry,
configRetry: cfg.telegram?.retry,
verbose: opts.verbose,
});
const remove = opts.remove === true;
const trimmedEmoji = emoji.trim();
const reactions =
@@ -247,7 +240,10 @@ export async function reactMessageTelegram(
if (typeof api.setMessageReaction !== "function") {
throw new Error("Telegram reactions are unavailable in this bot API.");
}
await api.setMessageReaction(chatId, messageId, reactions);
await request(
() => api.setMessageReaction(chatId, messageId, reactions),
"reaction",
);
return { ok: true };
}