refactor(telegram): extract runner config and key helper

This commit is contained in:
Peter Steinberger
2026-01-07 22:16:49 +01:00
parent 98d4e8034d
commit 322c5dd936
4 changed files with 56 additions and 47 deletions

View File

@@ -37,6 +37,7 @@ Status: production-ready for bot DMs + groups via grammY. Long-polling by defaul
- Inbound messages are normalized into the shared provider envelope with reply context and media placeholders. - Inbound messages are normalized into the shared provider envelope with reply context and media placeholders.
- Group replies require a mention by default (native @mention or `routing.groupChat.mentionPatterns`). - Group replies require a mention by default (native @mention or `routing.groupChat.mentionPatterns`).
- Replies always route back to the same Telegram chat. - Replies always route back to the same Telegram chat.
- Long-polling uses grammY runner with per-chat sequencing; overall concurrency is capped by `agent.maxConcurrent`.
## Group activation modes ## Group activation modes

View File

@@ -3,7 +3,7 @@ import os from "node:os";
import path from "node:path"; import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest"; import { beforeEach, describe, expect, it, vi } from "vitest";
import * as replyModule from "../auto-reply/reply.js"; import * as replyModule from "../auto-reply/reply.js";
import { createTelegramBot } from "./bot.js"; import { createTelegramBot, getTelegramSequentialKey } from "./bot.js";
const { loadWebMedia } = vi.hoisted(() => ({ const { loadWebMedia } = vi.hoisted(() => ({
loadWebMedia: vi.fn(), loadWebMedia: vi.fn(),
@@ -133,19 +133,17 @@ describe("createTelegramBot", () => {
expect(middlewareUseSpy).toHaveBeenCalledWith( expect(middlewareUseSpy).toHaveBeenCalledWith(
sequentializeSpy.mock.results[0]?.value, sequentializeSpy.mock.results[0]?.value,
); );
expect(sequentializeKey).toBeDefined(); expect(sequentializeKey).toBe(getTelegramSequentialKey);
expect(getTelegramSequentialKey({ message: { chat: { id: 123 } } })).toBe(
"telegram:123",
);
expect( expect(
sequentializeKey?.({ getTelegramSequentialKey({
message: { chat: { id: 123 } },
}),
).toBe("telegram:123");
expect(
sequentializeKey?.({
message: { chat: { id: 123 }, message_thread_id: 9 }, message: { chat: { id: 123 }, message_thread_id: 9 },
}), }),
).toBe("telegram:123:topic:9"); ).toBe("telegram:123:topic:9");
expect( expect(
sequentializeKey?.({ getTelegramSequentialKey({
update: { message: { chat: { id: 555 } } }, update: { message: { chat: { id: 555 } } },
}), }),
).toBe("telegram:555"); ).toBe("telegram:555");

View File

@@ -25,7 +25,7 @@ import {
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
import { getReplyFromConfig } from "../auto-reply/reply.js"; import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js"; import type { ReplyPayload } from "../auto-reply/types.js";
import type { ReplyToMode } from "../config/config.js"; import type { ClawdbotConfig, ReplyToMode } from "../config/config.js";
import { loadConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js";
import { import {
resolveProviderGroupPolicy, resolveProviderGroupPolicy,
@@ -112,8 +112,33 @@ export type TelegramBotOptions = {
mediaMaxMb?: number; mediaMaxMb?: number;
replyToMode?: ReplyToMode; replyToMode?: ReplyToMode;
proxyFetch?: typeof fetch; proxyFetch?: typeof fetch;
config?: ClawdbotConfig;
}; };
export function getTelegramSequentialKey(ctx: {
chat?: { id?: number };
message?: TelegramMessage;
update?: {
message?: TelegramMessage;
edited_message?: TelegramMessage;
callback_query?: { message?: TelegramMessage };
};
}): string {
const msg =
ctx.message ??
ctx.update?.message ??
ctx.update?.edited_message ??
ctx.update?.callback_query?.message;
const chatId = msg?.chat?.id ?? ctx.chat?.id;
const threadId = msg?.message_thread_id;
if (typeof chatId === "number") {
return threadId != null
? `telegram:${chatId}:topic:${threadId}`
: `telegram:${chatId}`;
}
return "telegram:unknown";
}
export function createTelegramBot(opts: TelegramBotOptions) { export function createTelegramBot(opts: TelegramBotOptions) {
const runtime: RuntimeEnv = opts.runtime ?? { const runtime: RuntimeEnv = opts.runtime ?? {
log: console.log, log: console.log,
@@ -128,34 +153,11 @@ export function createTelegramBot(opts: TelegramBotOptions) {
const bot = new Bot(opts.token, { client }); const bot = new Bot(opts.token, { client });
bot.api.config.use(apiThrottler()); bot.api.config.use(apiThrottler());
const resolveSequentialKey = (ctx: { bot.use(sequentialize(getTelegramSequentialKey));
chat?: { id?: number };
message?: TelegramMessage;
update?: {
message?: TelegramMessage;
edited_message?: TelegramMessage;
callback_query?: { message?: TelegramMessage };
};
}) => {
const msg =
ctx.message ??
ctx.update?.message ??
ctx.update?.edited_message ??
ctx.update?.callback_query?.message;
const chatId = msg?.chat?.id ?? ctx.chat?.id;
const threadId = msg?.message_thread_id;
if (typeof chatId === "number") {
return threadId != null
? `telegram:${chatId}:topic:${threadId}`
: `telegram:${chatId}`;
}
return "telegram:unknown";
};
bot.use(sequentialize(resolveSequentialKey));
const mediaGroupBuffer = new Map<string, MediaGroupEntry>(); const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
const cfg = loadConfig(); const cfg = opts.config ?? loadConfig();
const textLimit = resolveTextChunkLimit(cfg, "telegram"); const textLimit = resolveTextChunkLimit(cfg, "telegram");
const dmPolicy = cfg.telegram?.dmPolicy ?? "pairing"; const dmPolicy = cfg.telegram?.dmPolicy ?? "pairing";
const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom; const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom;

View File

@@ -1,4 +1,5 @@
import { run } from "@grammyjs/runner"; import { type RunOptions, run } from "@grammyjs/runner";
import type { ClawdbotConfig } from "../config/config.js";
import { loadConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js";
import type { RuntimeEnv } from "../runtime.js"; import type { RuntimeEnv } from "../runtime.js";
import { createTelegramBot } from "./bot.js"; import { createTelegramBot } from "./bot.js";
@@ -18,6 +19,22 @@ export type MonitorTelegramOpts = {
webhookUrl?: string; webhookUrl?: string;
}; };
export function createTelegramRunnerOptions(
cfg: ClawdbotConfig,
): RunOptions<unknown> {
return {
sink: {
concurrency: cfg.agent?.maxConcurrent ?? 1,
},
runner: {
fetch: {
// Match grammY defaults
timeout: 30,
},
},
};
}
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
const cfg = loadConfig(); const cfg = loadConfig();
const { token } = resolveTelegramToken(cfg, { const { token } = resolveTelegramToken(cfg, {
@@ -39,6 +56,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
token, token,
runtime: opts.runtime, runtime: opts.runtime,
proxyFetch, proxyFetch,
config: cfg,
}); });
if (opts.useWebhook) { if (opts.useWebhook) {
@@ -56,17 +74,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
} }
// Use grammyjs/runner for concurrent update processing // Use grammyjs/runner for concurrent update processing
const runner = run(bot, { const runner = run(bot, createTelegramRunnerOptions(cfg));
sink: {
concurrency: cfg.agent?.maxConcurrent ?? 1,
},
runner: {
fetch: {
// Match grammY defaults
timeout: 30,
},
},
});
const stopOnAbort = () => { const stopOnAbort = () => {
if (opts.abortSignal?.aborted) { if (opts.abortSignal?.aborted) {