diff --git a/CHANGELOG.md b/CHANGELOG.md index 612b647d3..e39b84165 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ ### Fixes - Discord/Telegram: add per-request retry policy with configurable delays and docs. +- Telegram: run long polling via grammY runner with per-chat sequentialization and concurrency tied to `agent.maxConcurrent`. Thanks @mukhtharcm for PR #366. - macOS: prevent gateway launchd startup race where the app could kill a just-started gateway; avoid unnecessary `bootout` and ensure the job is enabled at login. Fixes #306. Thanks @gupsammy for PR #387. - macOS: ignore ciao announcement cancellation rejections during Bonjour shutdown to avoid unhandled exits. Thanks @emanuelst for PR #419. - Pairing: generate DM pairing codes with CSPRNG, expire pending codes after 1 hour, and avoid re-sending codes for already pending requests. diff --git a/src/telegram/bot.media.test.ts b/src/telegram/bot.media.test.ts index 068f0fa7c..8f4038c1c 100644 --- a/src/telegram/bot.media.test.ts +++ b/src/telegram/bot.media.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from "vitest"; const useSpy = vi.fn(); +const middlewareUseSpy = vi.fn(); const onSpy = vi.fn(); const stopSpy = vi.fn(); const sendChatActionSpy = vi.fn(); @@ -18,6 +19,7 @@ const apiStub: ApiStub = { vi.mock("grammy", () => ({ Bot: class { api = apiStub; + use = middlewareUseSpy; on = onSpy; stop = stopSpy; constructor(public token: string) {} @@ -26,6 +28,10 @@ vi.mock("grammy", () => ({ webhookCallback: vi.fn(), })); +vi.mock("@grammyjs/runner", () => ({ + sequentialize: () => vi.fn(), +})); + const throttlerSpy = vi.fn(() => "throttler"); vi.mock("@grammyjs/transformer-throttler", () => ({ apiThrottler: () => throttlerSpy(), diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index 0175311e1..4b016dede 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -40,6 +40,7 @@ vi.mock("./pairing-store.js", () => ({ })); const useSpy = vi.fn(); +const middlewareUseSpy = vi.fn(); const onSpy = vi.fn(); const stopSpy = vi.fn(); const commandSpy = vi.fn(); @@ -71,6 +72,7 @@ const apiStub: ApiStub = { vi.mock("grammy", () => ({ Bot: class { api = apiStub; + use = middlewareUseSpy; on = onSpy; stop = stopSpy; command = commandSpy; @@ -80,6 +82,16 @@ vi.mock("grammy", () => ({ webhookCallback: vi.fn(), })); +const sequentializeMiddleware = vi.fn(); +const sequentializeSpy = vi.fn(() => sequentializeMiddleware); +let sequentializeKey: ((ctx: unknown) => string) | undefined; +vi.mock("@grammyjs/runner", () => ({ + sequentialize: (keyFn: (ctx: unknown) => string) => { + sequentializeKey = keyFn; + return sequentializeSpy(); + }, +})); + const throttlerSpy = vi.fn(() => "throttler"); vi.mock("@grammyjs/transformer-throttler", () => ({ @@ -104,6 +116,9 @@ describe("createTelegramBot", () => { sendPhotoSpy.mockReset(); setMessageReactionSpy.mockReset(); setMyCommandsSpy.mockReset(); + middlewareUseSpy.mockReset(); + sequentializeSpy.mockReset(); + sequentializeKey = undefined; }); it("installs grammY throttler", () => { @@ -112,6 +127,30 @@ describe("createTelegramBot", () => { expect(useSpy).toHaveBeenCalledWith("throttler"); }); + it("sequentializes updates by chat and thread", () => { + createTelegramBot({ token: "tok" }); + expect(sequentializeSpy).toHaveBeenCalledTimes(1); + expect(middlewareUseSpy).toHaveBeenCalledWith( + sequentializeSpy.mock.results[0]?.value, + ); + expect(sequentializeKey).toBeDefined(); + expect( + sequentializeKey?.({ + message: { chat: { id: 123 } }, + }), + ).toBe("telegram:123"); + expect( + sequentializeKey?.({ + message: { chat: { id: 123 }, message_thread_id: 9 }, + }), + ).toBe("telegram:123:topic:9"); + expect( + sequentializeKey?.({ + update: { message: { chat: { id: 555 } } }, + }), + ).toBe("telegram:555"); + }); + it("wraps inbound message with Telegram envelope", async () => { const originalTz = process.env.TZ; process.env.TZ = "Europe/Vienna"; diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 5eacb3abb..836dbd1c9 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -1,6 +1,7 @@ // @ts-nocheck import { Buffer } from "node:buffer"; +import { sequentialize } from "@grammyjs/runner"; import { apiThrottler } from "@grammyjs/transformer-throttler"; import type { ApiClientOptions, Message } from "grammy"; import { Bot, InputFile, webhookCallback } from "grammy"; @@ -127,6 +128,30 @@ export function createTelegramBot(opts: TelegramBotOptions) { const bot = new Bot(opts.token, { client }); bot.api.config.use(apiThrottler()); + const resolveSequentialKey = (ctx: { + chat?: { id?: number }; + message?: TelegramMessage; + update?: { + message?: TelegramMessage; + edited_message?: TelegramMessage; + callback_query?: { message?: TelegramMessage }; + }; + }) => { + const msg = + ctx.message ?? + ctx.update?.message ?? + ctx.update?.edited_message ?? + ctx.update?.callback_query?.message; + const chatId = msg?.chat?.id ?? ctx.chat?.id; + const threadId = msg?.message_thread_id; + if (typeof chatId === "number") { + return threadId != null + ? `telegram:${chatId}:topic:${threadId}` + : `telegram:${chatId}`; + } + return "telegram:unknown"; + }; + bot.use(sequentialize(resolveSequentialKey)); const mediaGroupBuffer = new Map(); diff --git a/src/telegram/monitor.test.ts b/src/telegram/monitor.test.ts index d4c240a75..740d28d95 100644 --- a/src/telegram/monitor.test.ts +++ b/src/telegram/monitor.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import { monitorTelegramProvider } from "./monitor.js"; @@ -23,6 +23,25 @@ const api = { setWebhook: vi.fn(), deleteWebhook: vi.fn(), }; +const { initSpy, runSpy, loadConfig } = vi.hoisted(() => ({ + initSpy: vi.fn(async () => undefined), + runSpy: vi.fn(() => ({ + task: () => Promise.resolve(), + stop: vi.fn(), + })), + loadConfig: vi.fn(() => ({ + agent: { maxConcurrent: 2 }, + telegram: {}, + })), +})); + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig, + }; +}); vi.mock("./bot.js", () => ({ createTelegramBot: () => { @@ -38,6 +57,7 @@ vi.mock("./bot.js", () => ({ on: vi.fn(), api, me: { username: "mybot" }, + init: initSpy, stop: vi.fn(), start: vi.fn(), }; @@ -47,10 +67,7 @@ vi.mock("./bot.js", () => ({ // Mock the grammyjs/runner to resolve immediately vi.mock("@grammyjs/runner", () => ({ - run: vi.fn(() => ({ - task: () => Promise.resolve(), - stop: vi.fn(), - })), + run: runSpy, })); vi.mock("../auto-reply/reply.js", () => ({ @@ -60,6 +77,15 @@ vi.mock("../auto-reply/reply.js", () => ({ })); describe("monitorTelegramProvider (grammY)", () => { + beforeEach(() => { + loadConfig.mockReturnValue({ + agent: { maxConcurrent: 2 }, + telegram: {}, + }); + initSpy.mockClear(); + runSpy.mockClear(); + }); + it("processes a DM and sends reply", async () => { Object.values(api).forEach((fn) => { fn?.mockReset?.(); @@ -80,6 +106,23 @@ describe("monitorTelegramProvider (grammY)", () => { }); }); + it("uses agent maxConcurrent for runner concurrency", async () => { + runSpy.mockClear(); + loadConfig.mockReturnValue({ + agent: { maxConcurrent: 3 }, + telegram: {}, + }); + + await monitorTelegramProvider({ token: "tok" }); + + expect(runSpy).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + sink: { concurrency: 3 }, + }), + ); + }); + it("requires mention in groups by default", async () => { Object.values(api).forEach((fn) => { fn?.mockReset?.(); diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 45fd56254..574822a51 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -19,7 +19,8 @@ export type MonitorTelegramOpts = { }; export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { - const { token } = resolveTelegramToken(loadConfig(), { + const cfg = loadConfig(); + const { token } = resolveTelegramToken(cfg, { envToken: opts.token, }); if (!token) { @@ -30,8 +31,8 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const proxyFetch = opts.proxyFetch ?? - (loadConfig().telegram?.proxy - ? makeProxyFetch(loadConfig().telegram?.proxy as string) + (cfg.telegram?.proxy + ? makeProxyFetch(cfg.telegram?.proxy as string) : undefined); const bot = createTelegramBot({ @@ -56,6 +57,9 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { // Use grammyjs/runner for concurrent update processing const runner = run(bot, { + sink: { + concurrency: cfg.agent?.maxConcurrent ?? 1, + }, runner: { fetch: { // Match grammY defaults @@ -66,7 +70,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const stopOnAbort = () => { if (opts.abortSignal?.aborted) { - runner.stop(); + void runner.stop(); } }; opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });