From 06a7e1e8ce4ad909bc22a7f64ba77b40a7bc9164 Mon Sep 17 00:00:00 2001 From: Rohan Nagpal Date: Sun, 25 Jan 2026 10:18:51 +0530 Subject: [PATCH] Telegram: threaded conversation support (#1597) * Telegram: isolate dm topic sessions * Tests: cap vitest workers * Tests: cap Vitest workers on CI macOS * Tests: avoid timer-based pi-ai stream mock * Tests: increase embedded runner timeout * fix: harden telegram dm thread handling (#1597) (thanks @rohannagpal) --------- Co-authored-by: Peter Steinberger --- CHANGELOG.md | 5 ++ scripts/test-parallel.mjs | 5 +- ...-undefined-sessionkey-is-undefined.test.ts | 20 +++++ src/agents/pi-embedded-runner.test.ts | 6 +- src/agents/pi-embedded-runner/history.ts | 10 ++- .../bot-message-context.dm-threads.test.ts | 72 +++++++++++++++++ src/telegram/bot-message-context.ts | 16 +++- src/telegram/bot-native-commands.ts | 10 ++- src/telegram/bot.test.ts | 77 +++++++++++++++++++ src/telegram/bot.ts | 46 ++++++++++- 10 files changed, 256 insertions(+), 11 deletions(-) create mode 100644 src/telegram/bot-message-context.dm-threads.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a698897d..b8c7e254f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ Docs: https://docs.clawd.bot - Docs: add Bedrock EC2 instance role setup + IAM steps. (#1625) Thanks @sergical. https://docs.clawd.bot/bedrock - Exec approvals: forward approval prompts to chat with `/approve` for all channels (including plugins). (#1621) Thanks @czekaj. https://docs.clawd.bot/tools/exec-approvals https://docs.clawd.bot/tools/slash-commands - Gateway: expose config.patch in the gateway tool with safe partial updates + restart sentinel. (#1653) Thanks @Glucksberg. +- Telegram: treat DM topics as separate sessions and keep DM history limits stable with thread suffixes. (#1597) Thanks @rohannagpal. +- Telegram: add verbose raw-update logging for inbound Telegram updates. (#1597) Thanks @rohannagpal. ### Fixes - BlueBubbles: keep part-index GUIDs in reply tags when short IDs are missing. @@ -40,6 +42,9 @@ Docs: https://docs.clawd.bot - Google Chat: tighten email allowlist matching, typing cleanup, media caps, and onboarding/docs/tests. (#1635) Thanks @iHildy. - Google Chat: normalize space targets without double `spaces/` prefix. - Messaging: keep newline chunking safe for fenced markdown blocks across channels. +- Tests: cap Vitest workers on CI macOS to reduce timeouts. (#1597) Thanks @rohannagpal. +- Tests: avoid fake-timer dependency in embedded runner stream mock to reduce CI flakes. (#1597) Thanks @rohannagpal. +- Tests: increase embedded runner ordering test timeout to reduce CI flakes. (#1597) Thanks @rohannagpal. ## 2026.1.23-1 diff --git a/scripts/test-parallel.mjs b/scripts/test-parallel.mjs index d1c4871b1..a60ae7847 100644 --- a/scripts/test-parallel.mjs +++ b/scripts/test-parallel.mjs @@ -23,11 +23,14 @@ const serialRuns = runs.filter((entry) => entry.name === "gateway"); const children = new Set(); const isCI = process.env.CI === "true" || process.env.GITHUB_ACTIONS === "true"; +const isMacOS = process.platform === "darwin" || process.env.RUNNER_OS === "macOS"; const overrideWorkers = Number.parseInt(process.env.CLAWDBOT_TEST_WORKERS ?? "", 10); const resolvedOverride = Number.isFinite(overrideWorkers) && overrideWorkers > 0 ? overrideWorkers : null; const localWorkers = Math.max(4, Math.min(16, os.cpus().length)); const perRunWorkers = Math.max(1, Math.floor(localWorkers / parallelRuns.length)); -const maxWorkers = isCI ? null : resolvedOverride ?? perRunWorkers; +// Keep worker counts predictable for local runs and for CI on macOS. +// In CI on linux/windows, prefer Vitest defaults to avoid cross-test interference from lower worker counts. +const maxWorkers = resolvedOverride ?? (isCI && !isMacOS ? null : perRunWorkers); const WARNING_SUPPRESSION_FLAGS = [ "--disable-warning=ExperimentalWarning", diff --git a/src/agents/pi-embedded-runner.get-dm-history-limit-from-session-key.returns-undefined-sessionkey-is-undefined.test.ts b/src/agents/pi-embedded-runner.get-dm-history-limit-from-session-key.returns-undefined-sessionkey-is-undefined.test.ts index 4b6f082b1..8c3be721b 100644 --- a/src/agents/pi-embedded-runner.get-dm-history-limit-from-session-key.returns-undefined-sessionkey-is-undefined.test.ts +++ b/src/agents/pi-embedded-runner.get-dm-history-limit-from-session-key.returns-undefined-sessionkey-is-undefined.test.ts @@ -121,6 +121,26 @@ describe("getDmHistoryLimitFromSessionKey", () => { } as ClawdbotConfig; expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123", config)).toBe(10); }); + it("strips thread suffix from dm session keys", () => { + const config = { + channels: { telegram: { dmHistoryLimit: 10, dms: { "123": { historyLimit: 7 } } } }, + } as ClawdbotConfig; + expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123:thread:999", config)).toBe( + 7, + ); + expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123:topic:555", config)).toBe(7); + expect(getDmHistoryLimitFromSessionKey("telegram:dm:123:thread:999", config)).toBe(7); + }); + it("keeps non-numeric thread markers in dm ids", () => { + const config = { + channels: { + telegram: { dms: { "user:thread:abc": { historyLimit: 9 } } }, + }, + } as ClawdbotConfig; + expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:user:thread:abc", config)).toBe( + 9, + ); + }); it("returns undefined for non-dm session kinds", () => { const config = { channels: { diff --git a/src/agents/pi-embedded-runner.test.ts b/src/agents/pi-embedded-runner.test.ts index ee4b60c30..03192338b 100644 --- a/src/agents/pi-embedded-runner.test.ts +++ b/src/agents/pi-embedded-runner.test.ts @@ -70,7 +70,7 @@ vi.mock("@mariozechner/pi-ai", async () => { }, streamSimple: (model: { api: string; provider: string; id: string }) => { const stream = new actual.AssistantMessageEventStream(); - setTimeout(() => { + queueMicrotask(() => { stream.push({ type: "done", reason: "stop", @@ -80,7 +80,7 @@ vi.mock("@mariozechner/pi-ai", async () => { : buildAssistantMessage(model), }); stream.end(); - }, 0); + }); return stream; }, }; @@ -213,7 +213,7 @@ describe("runEmbeddedPiAgent", () => { itIfNotWin32( "persists the first user message before assistant output", - { timeout: 60_000 }, + { timeout: 120_000 }, async () => { const sessionFile = nextSessionFile(); const cfg = makeOpenAiConfig(["mock-1"]); diff --git a/src/agents/pi-embedded-runner/history.ts b/src/agents/pi-embedded-runner/history.ts index bcc0625c7..91706222a 100644 --- a/src/agents/pi-embedded-runner/history.ts +++ b/src/agents/pi-embedded-runner/history.ts @@ -2,6 +2,13 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { ClawdbotConfig } from "../../config/config.js"; +const THREAD_SUFFIX_REGEX = /^(.*)(?::(?:thread|topic):\d+)$/i; + +function stripThreadSuffix(value: string): string { + const match = value.match(THREAD_SUFFIX_REGEX); + return match?.[1] ?? value; +} + /** * Limits conversation history to the last N user turns (and their associated * assistant responses). This reduces token usage for long-running DM sessions. @@ -44,7 +51,8 @@ export function getDmHistoryLimitFromSessionKey( if (!provider) return undefined; const kind = providerParts[1]?.toLowerCase(); - const userId = providerParts.slice(2).join(":"); + const userIdRaw = providerParts.slice(2).join(":"); + const userId = stripThreadSuffix(userIdRaw); if (kind !== "dm") return undefined; const getLimit = ( diff --git a/src/telegram/bot-message-context.dm-threads.test.ts b/src/telegram/bot-message-context.dm-threads.test.ts new file mode 100644 index 000000000..ff6a8a837 --- /dev/null +++ b/src/telegram/bot-message-context.dm-threads.test.ts @@ -0,0 +1,72 @@ +import { describe, expect, it, vi } from "vitest"; + +import { buildTelegramMessageContext } from "./bot-message-context.js"; + +describe("buildTelegramMessageContext dm thread sessions", () => { + const baseConfig = { + agents: { defaults: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/clawd" } }, + channels: { telegram: {} }, + messages: { groupChat: { mentionPatterns: [] } }, + } as never; + + const buildContext = async (message: Record) => + await buildTelegramMessageContext({ + primaryCtx: { + message, + me: { id: 7, username: "bot" }, + } as never, + allMedia: [], + storeAllowFrom: [], + options: {}, + bot: { + api: { + sendChatAction: vi.fn(), + setMessageReaction: vi.fn(), + }, + } as never, + cfg: baseConfig, + account: { accountId: "default" } as never, + historyLimit: 0, + groupHistories: new Map(), + dmPolicy: "open", + allowFrom: [], + groupAllowFrom: [], + ackReactionScope: "off", + logger: { info: vi.fn() }, + resolveGroupActivation: () => undefined, + resolveGroupRequireMention: () => false, + resolveTelegramGroupConfig: () => ({ + groupConfig: { requireMention: false }, + topicConfig: undefined, + }), + }); + + it("uses thread session key for dm topics", async () => { + const ctx = await buildContext({ + message_id: 1, + chat: { id: 1234, type: "private" }, + date: 1700000000, + text: "hello", + message_thread_id: 42, + from: { id: 42, first_name: "Alice" }, + }); + + expect(ctx).not.toBeNull(); + expect(ctx?.ctxPayload?.MessageThreadId).toBe(42); + expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main:thread:42"); + }); + + it("keeps legacy dm session key when no thread id", async () => { + const ctx = await buildContext({ + message_id: 2, + chat: { id: 1234, type: "private" }, + date: 1700000001, + text: "hello", + from: { id: 42, first_name: "Alice" }, + }); + + expect(ctx).not.toBeNull(); + expect(ctx?.ctxPayload?.MessageThreadId).toBeUndefined(); + expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main"); + }); +}); diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index 85d21228e..d90b6ffea 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -20,6 +20,7 @@ import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../conf import { logVerbose, shouldLogVerbose } from "../globals.js"; import { recordChannelActivity } from "../infra/channel-activity.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { resolveThreadSessionKeys } from "../routing/session-key.js"; import { shouldAckReaction as shouldAckReactionGate } from "../channels/ack-reactions.js"; import { resolveMentionGatingWithBypass } from "../channels/mention-gating.js"; import { resolveControlCommandGate } from "../channels/command-gating.js"; @@ -136,6 +137,13 @@ export const buildTelegramMessageContext = async ({ id: peerId, }, }); + const baseSessionKey = route.sessionKey; + const dmThreadId = !isGroup ? resolvedThreadId : undefined; + const threadKeys = + dmThreadId != null + ? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) }) + : null; + const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; const mentionRegexes = buildMentionRegexes(cfg, route.agentId); const effectiveDmAllow = normalizeAllowFromWithStore({ allowFrom, storeAllowFrom }); const groupAllowOverride = firstDefined(topicConfig?.allowFrom, groupConfig?.allowFrom); @@ -325,7 +333,7 @@ export const buildTelegramMessageContext = async ({ const activationOverride = resolveGroupActivation({ chatId, messageThreadId: resolvedThreadId, - sessionKey: route.sessionKey, + sessionKey: sessionKey, agentId: route.agentId, }); const baseRequireMention = resolveGroupRequireMention(chatId); @@ -432,7 +440,7 @@ export const buildTelegramMessageContext = async ({ const envelopeOptions = resolveEnvelopeFormatOptions(cfg); const previousTimestamp = readSessionUpdatedAt({ storePath, - sessionKey: route.sessionKey, + sessionKey: sessionKey, }); const body = formatInboundEnvelope({ channel: "Telegram", @@ -482,7 +490,7 @@ export const buildTelegramMessageContext = async ({ CommandBody: commandBody, From: isGroup ? buildTelegramGroupFrom(chatId, resolvedThreadId) : `telegram:${chatId}`, To: `telegram:${chatId}`, - SessionKey: route.sessionKey, + SessionKey: sessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", ConversationLabel: conversationLabel, @@ -526,7 +534,7 @@ export const buildTelegramMessageContext = async ({ await recordInboundSession({ storePath, - sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + sessionKey: ctxPayload.SessionKey ?? sessionKey, ctx: ctxPayload, updateLastRoute: !isGroup ? { diff --git a/src/telegram/bot-native-commands.ts b/src/telegram/bot-native-commands.ts index a983452ba..fc28e58e3 100644 --- a/src/telegram/bot-native-commands.ts +++ b/src/telegram/bot-native-commands.ts @@ -18,6 +18,7 @@ import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js"; import { danger, logVerbose } from "../globals.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { resolveThreadSessionKeys } from "../routing/session-key.js"; import { resolveCommandAuthorizedFromAuthorizers } from "../channels/command-gating.js"; import type { ChannelGroupPolicy } from "../config/group-policy.js"; import type { @@ -271,6 +272,13 @@ export const registerTelegramNativeCommands = ({ id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId), }, }); + const baseSessionKey = route.sessionKey; + const dmThreadId = !isGroup ? resolvedThreadId : undefined; + const threadKeys = + dmThreadId != null + ? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) }) + : null; + const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; const tableMode = resolveMarkdownTableMode({ cfg, channel: "telegram", @@ -309,7 +317,7 @@ export const registerTelegramNativeCommands = ({ CommandAuthorized: commandAuthorized, CommandSource: "native" as const, SessionKey: `telegram:slash:${senderId || chatId}`, - CommandTargetSessionKey: route.sessionKey, + CommandTargetSessionKey: sessionKey, MessageThreadId: resolvedThreadId, IsForum: isForum, // Originating context for sub-agent announce routing diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index ab2b55505..486741f53 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -2163,6 +2163,46 @@ describe("createTelegramBot", () => { expect.objectContaining({ message_thread_id: 99 }), ); }); + it("sets command target session key for dm topic commands", async () => { + onSpy.mockReset(); + sendMessageSpy.mockReset(); + commandSpy.mockReset(); + const replySpy = replyModule.__replySpy as unknown as ReturnType; + replySpy.mockReset(); + replySpy.mockResolvedValue({ text: "response" }); + + loadConfig.mockReturnValue({ + commands: { native: true }, + channels: { + telegram: { + dmPolicy: "pairing", + }, + }, + }); + readTelegramAllowFromStore.mockResolvedValueOnce(["12345"]); + + createTelegramBot({ token: "tok" }); + const handler = commandSpy.mock.calls.find((call) => call[0] === "status")?.[1] as + | ((ctx: Record) => Promise) + | undefined; + if (!handler) throw new Error("status command handler missing"); + + await handler({ + message: { + chat: { id: 12345, type: "private" }, + from: { id: 12345, username: "testuser" }, + text: "/status", + date: 1736380800, + message_id: 42, + message_thread_id: 99, + }, + match: "", + }); + + expect(replySpy).toHaveBeenCalledTimes(1); + const payload = replySpy.mock.calls[0][0]; + expect(payload.CommandTargetSessionKey).toBe("agent:main:main:thread:99"); + }); it("allows native DM commands for paired users", async () => { onSpy.mockReset(); @@ -2789,4 +2829,41 @@ describe("createTelegramBot", () => { const sessionKey = enqueueSystemEvent.mock.calls[0][1].sessionKey; expect(sessionKey).not.toContain(":topic:"); }); + it("uses thread session key for dm reactions with topic id", async () => { + onSpy.mockReset(); + enqueueSystemEvent.mockReset(); + + loadConfig.mockReturnValue({ + channels: { + telegram: { dmPolicy: "open", reactionNotifications: "all" }, + }, + }); + + createTelegramBot({ token: "tok" }); + const handler = getOnHandler("message_reaction") as ( + ctx: Record, + ) => Promise; + + await handler({ + update: { update_id: 508 }, + messageReaction: { + chat: { id: 1234, type: "private" }, + message_id: 300, + message_thread_id: 42, + user: { id: 12, first_name: "Dana" }, + date: 1736380800, + old_reaction: [], + new_reaction: [{ type: "emoji", emoji: "🔥" }], + }, + }); + + expect(enqueueSystemEvent).toHaveBeenCalledTimes(1); + expect(enqueueSystemEvent).toHaveBeenCalledWith( + "Telegram reaction added: 🔥 by Dana on msg 300", + expect.objectContaining({ + sessionKey: expect.stringContaining(":thread:42"), + contextKey: expect.stringContaining("telegram:reaction:add:1234:300:12"), + }), + ); + }); }); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index de7d715ab..d958d5616 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -20,9 +20,11 @@ import { } from "../config/group-policy.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { resolveThreadSessionKeys } from "../routing/session-key.js"; import type { RuntimeEnv } from "../runtime.js"; import { resolveTelegramAccount } from "./accounts.js"; import { @@ -161,7 +163,42 @@ export function createTelegramBot(opts: TelegramBotOptions) { return skipped; }; + const rawUpdateLogger = createSubsystemLogger("gateway/channels/telegram/raw-update"); + const MAX_RAW_UPDATE_CHARS = 8000; + const MAX_RAW_UPDATE_STRING = 500; + const MAX_RAW_UPDATE_ARRAY = 20; + const stringifyUpdate = (update: unknown) => { + const seen = new WeakSet(); + return JSON.stringify(update ?? null, (key, value) => { + if (typeof value === "string" && value.length > MAX_RAW_UPDATE_STRING) { + return `${value.slice(0, MAX_RAW_UPDATE_STRING)}...`; + } + if (Array.isArray(value) && value.length > MAX_RAW_UPDATE_ARRAY) { + return [ + ...value.slice(0, MAX_RAW_UPDATE_ARRAY), + `...(${value.length - MAX_RAW_UPDATE_ARRAY} more)`, + ]; + } + if (value && typeof value === "object") { + const obj = value as object; + if (seen.has(obj)) return "[Circular]"; + seen.add(obj); + } + return value; + }); + }; + bot.use(async (ctx, next) => { + if (shouldLogVerbose()) { + try { + const raw = stringifyUpdate(ctx.update); + const preview = + raw.length > MAX_RAW_UPDATE_CHARS ? `${raw.slice(0, MAX_RAW_UPDATE_CHARS)}...` : raw; + rawUpdateLogger.debug(`telegram update: ${preview}`); + } catch (err) { + rawUpdateLogger.debug(`telegram update log failed: ${String(err)}`); + } + } await next(); recordUpdateId(ctx); }); @@ -372,13 +409,20 @@ export function createTelegramBot(opts: TelegramBotOptions) { accountId: account.accountId, peer: { kind: isGroup ? "group" : "dm", id: peerId }, }); + const baseSessionKey = route.sessionKey; + const dmThreadId = !isGroup ? resolvedThreadId : undefined; + const threadKeys = + dmThreadId != null + ? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) }) + : null; + const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; // Enqueue system event for each added reaction for (const r of addedReactions) { const emoji = r.emoji; const text = `Telegram reaction added: ${emoji} by ${senderLabel} on msg ${messageId}`; enqueueSystemEvent(text, { - sessionKey: route.sessionKey, + sessionKey: sessionKey, contextKey: `telegram:reaction:add:${chatId}:${messageId}:${user?.id ?? "anon"}:${emoji}`, }); logVerbose(`telegram: reaction event enqueued: ${text}`);