From 322c5dd936ab415d016065be59b5ac024d19a994 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 7 Jan 2026 22:16:49 +0100 Subject: [PATCH] refactor(telegram): extract runner config and key helper --- docs/providers/telegram.md | 1 + src/telegram/bot.test.ts | 16 +++++------ src/telegram/bot.ts | 54 ++++++++++++++++++++------------------ src/telegram/monitor.ts | 32 +++++++++++++--------- 4 files changed, 56 insertions(+), 47 deletions(-) diff --git a/docs/providers/telegram.md b/docs/providers/telegram.md index 18963f35c..a8722481f 100644 --- a/docs/providers/telegram.md +++ b/docs/providers/telegram.md @@ -37,6 +37,7 @@ Status: production-ready for bot DMs + groups via grammY. Long-polling by defaul - Inbound messages are normalized into the shared provider envelope with reply context and media placeholders. - Group replies require a mention by default (native @mention or `routing.groupChat.mentionPatterns`). - Replies always route back to the same Telegram chat. +- Long-polling uses grammY runner with per-chat sequencing; overall concurrency is capped by `agent.maxConcurrent`. ## Group activation modes diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index 4b016dede..85a590a09 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -3,7 +3,7 @@ import os from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import * as replyModule from "../auto-reply/reply.js"; -import { createTelegramBot } from "./bot.js"; +import { createTelegramBot, getTelegramSequentialKey } from "./bot.js"; const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), @@ -133,19 +133,17 @@ describe("createTelegramBot", () => { expect(middlewareUseSpy).toHaveBeenCalledWith( sequentializeSpy.mock.results[0]?.value, ); - expect(sequentializeKey).toBeDefined(); + expect(sequentializeKey).toBe(getTelegramSequentialKey); + expect(getTelegramSequentialKey({ message: { chat: { id: 123 } } })).toBe( + "telegram:123", + ); expect( - sequentializeKey?.({ - message: { chat: { id: 123 } }, - }), - ).toBe("telegram:123"); - expect( - sequentializeKey?.({ + getTelegramSequentialKey({ message: { chat: { id: 123 }, message_thread_id: 9 }, }), ).toBe("telegram:123:topic:9"); expect( - sequentializeKey?.({ + getTelegramSequentialKey({ update: { message: { chat: { id: 555 } } }, }), ).toBe("telegram:555"); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 836dbd1c9..bde8de6d6 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -25,7 +25,7 @@ import { import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; -import type { ReplyToMode } from "../config/config.js"; +import type { ClawdbotConfig, ReplyToMode } from "../config/config.js"; import { loadConfig } from "../config/config.js"; import { resolveProviderGroupPolicy, @@ -112,8 +112,33 @@ export type TelegramBotOptions = { mediaMaxMb?: number; replyToMode?: ReplyToMode; proxyFetch?: typeof fetch; + config?: ClawdbotConfig; }; +export function getTelegramSequentialKey(ctx: { + chat?: { id?: number }; + message?: TelegramMessage; + update?: { + message?: TelegramMessage; + edited_message?: TelegramMessage; + callback_query?: { message?: TelegramMessage }; + }; +}): string { + const msg = + ctx.message ?? + ctx.update?.message ?? + ctx.update?.edited_message ?? + ctx.update?.callback_query?.message; + const chatId = msg?.chat?.id ?? ctx.chat?.id; + const threadId = msg?.message_thread_id; + if (typeof chatId === "number") { + return threadId != null + ? `telegram:${chatId}:topic:${threadId}` + : `telegram:${chatId}`; + } + return "telegram:unknown"; +} + export function createTelegramBot(opts: TelegramBotOptions) { const runtime: RuntimeEnv = opts.runtime ?? { log: console.log, @@ -128,34 +153,11 @@ export function createTelegramBot(opts: TelegramBotOptions) { const bot = new Bot(opts.token, { client }); bot.api.config.use(apiThrottler()); - const resolveSequentialKey = (ctx: { - chat?: { id?: number }; - message?: TelegramMessage; - update?: { - message?: TelegramMessage; - edited_message?: TelegramMessage; - callback_query?: { message?: TelegramMessage }; - }; - }) => { - const msg = - ctx.message ?? - ctx.update?.message ?? - ctx.update?.edited_message ?? - ctx.update?.callback_query?.message; - const chatId = msg?.chat?.id ?? ctx.chat?.id; - const threadId = msg?.message_thread_id; - if (typeof chatId === "number") { - return threadId != null - ? `telegram:${chatId}:topic:${threadId}` - : `telegram:${chatId}`; - } - return "telegram:unknown"; - }; - bot.use(sequentialize(resolveSequentialKey)); + bot.use(sequentialize(getTelegramSequentialKey)); const mediaGroupBuffer = new Map(); - const cfg = loadConfig(); + const cfg = opts.config ?? loadConfig(); const textLimit = resolveTextChunkLimit(cfg, "telegram"); const dmPolicy = cfg.telegram?.dmPolicy ?? "pairing"; const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom; diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 574822a51..6be0da70a 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -1,4 +1,5 @@ -import { run } from "@grammyjs/runner"; +import { type RunOptions, run } from "@grammyjs/runner"; +import type { ClawdbotConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js"; import type { RuntimeEnv } from "../runtime.js"; import { createTelegramBot } from "./bot.js"; @@ -18,6 +19,22 @@ export type MonitorTelegramOpts = { webhookUrl?: string; }; +export function createTelegramRunnerOptions( + cfg: ClawdbotConfig, +): RunOptions { + return { + sink: { + concurrency: cfg.agent?.maxConcurrent ?? 1, + }, + runner: { + fetch: { + // Match grammY defaults + timeout: 30, + }, + }, + }; +} + export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const cfg = loadConfig(); const { token } = resolveTelegramToken(cfg, { @@ -39,6 +56,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { token, runtime: opts.runtime, proxyFetch, + config: cfg, }); if (opts.useWebhook) { @@ -56,17 +74,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { } // Use grammyjs/runner for concurrent update processing - const runner = run(bot, { - sink: { - concurrency: cfg.agent?.maxConcurrent ?? 1, - }, - runner: { - fetch: { - // Match grammY defaults - timeout: 30, - }, - }, - }); + const runner = run(bot, createTelegramRunnerOptions(cfg)); const stopOnAbort = () => { if (opts.abortSignal?.aborted) {