Telegram: threaded conversation support (#1597)

* Telegram: isolate dm topic sessions

* Tests: cap vitest workers

* Tests: cap Vitest workers on CI macOS

* Tests: avoid timer-based pi-ai stream mock

* Tests: increase embedded runner timeout

* fix: harden telegram dm thread handling (#1597) (thanks @rohannagpal)

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Rohan Nagpal
2026-01-25 10:18:51 +05:30
committed by GitHub
parent 9eaaadf8ee
commit 06a7e1e8ce
10 changed files with 256 additions and 11 deletions

View File

@@ -18,6 +18,8 @@ Docs: https://docs.clawd.bot
- Docs: add Bedrock EC2 instance role setup + IAM steps. (#1625) Thanks @sergical. https://docs.clawd.bot/bedrock
- Exec approvals: forward approval prompts to chat with `/approve` for all channels (including plugins). (#1621) Thanks @czekaj. https://docs.clawd.bot/tools/exec-approvals https://docs.clawd.bot/tools/slash-commands
- Gateway: expose config.patch in the gateway tool with safe partial updates + restart sentinel. (#1653) Thanks @Glucksberg.
- Telegram: treat DM topics as separate sessions and keep DM history limits stable with thread suffixes. (#1597) Thanks @rohannagpal.
- Telegram: add verbose raw-update logging for inbound Telegram updates. (#1597) Thanks @rohannagpal.
### Fixes
- BlueBubbles: keep part-index GUIDs in reply tags when short IDs are missing.
@@ -40,6 +42,9 @@ Docs: https://docs.clawd.bot
- Google Chat: tighten email allowlist matching, typing cleanup, media caps, and onboarding/docs/tests. (#1635) Thanks @iHildy.
- Google Chat: normalize space targets without double `spaces/` prefix.
- Messaging: keep newline chunking safe for fenced markdown blocks across channels.
- Tests: cap Vitest workers on CI macOS to reduce timeouts. (#1597) Thanks @rohannagpal.
- Tests: avoid fake-timer dependency in embedded runner stream mock to reduce CI flakes. (#1597) Thanks @rohannagpal.
- Tests: increase embedded runner ordering test timeout to reduce CI flakes. (#1597) Thanks @rohannagpal.
## 2026.1.23-1

View File

@@ -23,11 +23,14 @@ const serialRuns = runs.filter((entry) => entry.name === "gateway");
const children = new Set();
const isCI = process.env.CI === "true" || process.env.GITHUB_ACTIONS === "true";
const isMacOS = process.platform === "darwin" || process.env.RUNNER_OS === "macOS";
const overrideWorkers = Number.parseInt(process.env.CLAWDBOT_TEST_WORKERS ?? "", 10);
const resolvedOverride = Number.isFinite(overrideWorkers) && overrideWorkers > 0 ? overrideWorkers : null;
const localWorkers = Math.max(4, Math.min(16, os.cpus().length));
const perRunWorkers = Math.max(1, Math.floor(localWorkers / parallelRuns.length));
const maxWorkers = isCI ? null : resolvedOverride ?? perRunWorkers;
// Keep worker counts predictable for local runs and for CI on macOS.
// In CI on linux/windows, prefer Vitest defaults to avoid cross-test interference from lower worker counts.
const maxWorkers = resolvedOverride ?? (isCI && !isMacOS ? null : perRunWorkers);
const WARNING_SUPPRESSION_FLAGS = [
"--disable-warning=ExperimentalWarning",

View File

@@ -121,6 +121,26 @@ describe("getDmHistoryLimitFromSessionKey", () => {
} as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123", config)).toBe(10);
});
it("strips thread suffix from dm session keys", () => {
const config = {
channels: { telegram: { dmHistoryLimit: 10, dms: { "123": { historyLimit: 7 } } } },
} as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123:thread:999", config)).toBe(
7,
);
expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123:topic:555", config)).toBe(7);
expect(getDmHistoryLimitFromSessionKey("telegram:dm:123:thread:999", config)).toBe(7);
});
it("keeps non-numeric thread markers in dm ids", () => {
const config = {
channels: {
telegram: { dms: { "user:thread:abc": { historyLimit: 9 } } },
},
} as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:user:thread:abc", config)).toBe(
9,
);
});
it("returns undefined for non-dm session kinds", () => {
const config = {
channels: {

View File

@@ -70,7 +70,7 @@ vi.mock("@mariozechner/pi-ai", async () => {
},
streamSimple: (model: { api: string; provider: string; id: string }) => {
const stream = new actual.AssistantMessageEventStream();
setTimeout(() => {
queueMicrotask(() => {
stream.push({
type: "done",
reason: "stop",
@@ -80,7 +80,7 @@ vi.mock("@mariozechner/pi-ai", async () => {
: buildAssistantMessage(model),
});
stream.end();
}, 0);
});
return stream;
},
};
@@ -213,7 +213,7 @@ describe("runEmbeddedPiAgent", () => {
itIfNotWin32(
"persists the first user message before assistant output",
{ timeout: 60_000 },
{ timeout: 120_000 },
async () => {
const sessionFile = nextSessionFile();
const cfg = makeOpenAiConfig(["mock-1"]);

View File

@@ -2,6 +2,13 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { ClawdbotConfig } from "../../config/config.js";
const THREAD_SUFFIX_REGEX = /^(.*)(?::(?:thread|topic):\d+)$/i;
function stripThreadSuffix(value: string): string {
const match = value.match(THREAD_SUFFIX_REGEX);
return match?.[1] ?? value;
}
/**
* Limits conversation history to the last N user turns (and their associated
* assistant responses). This reduces token usage for long-running DM sessions.
@@ -44,7 +51,8 @@ export function getDmHistoryLimitFromSessionKey(
if (!provider) return undefined;
const kind = providerParts[1]?.toLowerCase();
const userId = providerParts.slice(2).join(":");
const userIdRaw = providerParts.slice(2).join(":");
const userId = stripThreadSuffix(userIdRaw);
if (kind !== "dm") return undefined;
const getLimit = (

View File

@@ -0,0 +1,72 @@
import { describe, expect, it, vi } from "vitest";
import { buildTelegramMessageContext } from "./bot-message-context.js";
describe("buildTelegramMessageContext dm thread sessions", () => {
const baseConfig = {
agents: { defaults: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/clawd" } },
channels: { telegram: {} },
messages: { groupChat: { mentionPatterns: [] } },
} as never;
const buildContext = async (message: Record<string, unknown>) =>
await buildTelegramMessageContext({
primaryCtx: {
message,
me: { id: 7, username: "bot" },
} as never,
allMedia: [],
storeAllowFrom: [],
options: {},
bot: {
api: {
sendChatAction: vi.fn(),
setMessageReaction: vi.fn(),
},
} as never,
cfg: baseConfig,
account: { accountId: "default" } as never,
historyLimit: 0,
groupHistories: new Map(),
dmPolicy: "open",
allowFrom: [],
groupAllowFrom: [],
ackReactionScope: "off",
logger: { info: vi.fn() },
resolveGroupActivation: () => undefined,
resolveGroupRequireMention: () => false,
resolveTelegramGroupConfig: () => ({
groupConfig: { requireMention: false },
topicConfig: undefined,
}),
});
it("uses thread session key for dm topics", async () => {
const ctx = await buildContext({
message_id: 1,
chat: { id: 1234, type: "private" },
date: 1700000000,
text: "hello",
message_thread_id: 42,
from: { id: 42, first_name: "Alice" },
});
expect(ctx).not.toBeNull();
expect(ctx?.ctxPayload?.MessageThreadId).toBe(42);
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main:thread:42");
});
it("keeps legacy dm session key when no thread id", async () => {
const ctx = await buildContext({
message_id: 2,
chat: { id: 1234, type: "private" },
date: 1700000001,
text: "hello",
from: { id: 42, first_name: "Alice" },
});
expect(ctx).not.toBeNull();
expect(ctx?.ctxPayload?.MessageThreadId).toBeUndefined();
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main");
});
});

View File

@@ -20,6 +20,7 @@ import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../conf
import { logVerbose, shouldLogVerbose } from "../globals.js";
import { recordChannelActivity } from "../infra/channel-activity.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import { shouldAckReaction as shouldAckReactionGate } from "../channels/ack-reactions.js";
import { resolveMentionGatingWithBypass } from "../channels/mention-gating.js";
import { resolveControlCommandGate } from "../channels/command-gating.js";
@@ -136,6 +137,13 @@ export const buildTelegramMessageContext = async ({
id: peerId,
},
});
const baseSessionKey = route.sessionKey;
const dmThreadId = !isGroup ? resolvedThreadId : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) })
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
const mentionRegexes = buildMentionRegexes(cfg, route.agentId);
const effectiveDmAllow = normalizeAllowFromWithStore({ allowFrom, storeAllowFrom });
const groupAllowOverride = firstDefined(topicConfig?.allowFrom, groupConfig?.allowFrom);
@@ -325,7 +333,7 @@ export const buildTelegramMessageContext = async ({
const activationOverride = resolveGroupActivation({
chatId,
messageThreadId: resolvedThreadId,
sessionKey: route.sessionKey,
sessionKey: sessionKey,
agentId: route.agentId,
});
const baseRequireMention = resolveGroupRequireMention(chatId);
@@ -432,7 +440,7 @@ export const buildTelegramMessageContext = async ({
const envelopeOptions = resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
sessionKey: sessionKey,
});
const body = formatInboundEnvelope({
channel: "Telegram",
@@ -482,7 +490,7 @@ export const buildTelegramMessageContext = async ({
CommandBody: commandBody,
From: isGroup ? buildTelegramGroupFrom(chatId, resolvedThreadId) : `telegram:${chatId}`,
To: `telegram:${chatId}`,
SessionKey: route.sessionKey,
SessionKey: sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: conversationLabel,
@@ -526,7 +534,7 @@ export const buildTelegramMessageContext = async ({
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
sessionKey: ctxPayload.SessionKey ?? sessionKey,
ctx: ctxPayload,
updateLastRoute: !isGroup
? {

View File

@@ -18,6 +18,7 @@ import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
import { danger, logVerbose } from "../globals.js";
import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import { resolveCommandAuthorizedFromAuthorizers } from "../channels/command-gating.js";
import type { ChannelGroupPolicy } from "../config/group-policy.js";
import type {
@@ -271,6 +272,13 @@ export const registerTelegramNativeCommands = ({
id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId),
},
});
const baseSessionKey = route.sessionKey;
const dmThreadId = !isGroup ? resolvedThreadId : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) })
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
const tableMode = resolveMarkdownTableMode({
cfg,
channel: "telegram",
@@ -309,7 +317,7 @@ export const registerTelegramNativeCommands = ({
CommandAuthorized: commandAuthorized,
CommandSource: "native" as const,
SessionKey: `telegram:slash:${senderId || chatId}`,
CommandTargetSessionKey: route.sessionKey,
CommandTargetSessionKey: sessionKey,
MessageThreadId: resolvedThreadId,
IsForum: isForum,
// Originating context for sub-agent announce routing

View File

@@ -2163,6 +2163,46 @@ describe("createTelegramBot", () => {
expect.objectContaining({ message_thread_id: 99 }),
);
});
it("sets command target session key for dm topic commands", async () => {
onSpy.mockReset();
sendMessageSpy.mockReset();
commandSpy.mockReset();
const replySpy = replyModule.__replySpy as unknown as ReturnType<typeof vi.fn>;
replySpy.mockReset();
replySpy.mockResolvedValue({ text: "response" });
loadConfig.mockReturnValue({
commands: { native: true },
channels: {
telegram: {
dmPolicy: "pairing",
},
},
});
readTelegramAllowFromStore.mockResolvedValueOnce(["12345"]);
createTelegramBot({ token: "tok" });
const handler = commandSpy.mock.calls.find((call) => call[0] === "status")?.[1] as
| ((ctx: Record<string, unknown>) => Promise<void>)
| undefined;
if (!handler) throw new Error("status command handler missing");
await handler({
message: {
chat: { id: 12345, type: "private" },
from: { id: 12345, username: "testuser" },
text: "/status",
date: 1736380800,
message_id: 42,
message_thread_id: 99,
},
match: "",
});
expect(replySpy).toHaveBeenCalledTimes(1);
const payload = replySpy.mock.calls[0][0];
expect(payload.CommandTargetSessionKey).toBe("agent:main:main:thread:99");
});
it("allows native DM commands for paired users", async () => {
onSpy.mockReset();
@@ -2789,4 +2829,41 @@ describe("createTelegramBot", () => {
const sessionKey = enqueueSystemEvent.mock.calls[0][1].sessionKey;
expect(sessionKey).not.toContain(":topic:");
});
it("uses thread session key for dm reactions with topic id", async () => {
onSpy.mockReset();
enqueueSystemEvent.mockReset();
loadConfig.mockReturnValue({
channels: {
telegram: { dmPolicy: "open", reactionNotifications: "all" },
},
});
createTelegramBot({ token: "tok" });
const handler = getOnHandler("message_reaction") as (
ctx: Record<string, unknown>,
) => Promise<void>;
await handler({
update: { update_id: 508 },
messageReaction: {
chat: { id: 1234, type: "private" },
message_id: 300,
message_thread_id: 42,
user: { id: 12, first_name: "Dana" },
date: 1736380800,
old_reaction: [],
new_reaction: [{ type: "emoji", emoji: "🔥" }],
},
});
expect(enqueueSystemEvent).toHaveBeenCalledTimes(1);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Telegram reaction added: 🔥 by Dana on msg 300",
expect.objectContaining({
sessionKey: expect.stringContaining(":thread:42"),
contextKey: expect.stringContaining("telegram:reaction:add:1234:300:12"),
}),
);
});
});

View File

@@ -20,9 +20,11 @@ import {
} from "../config/group-policy.js";
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import type { RuntimeEnv } from "../runtime.js";
import { resolveTelegramAccount } from "./accounts.js";
import {
@@ -161,7 +163,42 @@ export function createTelegramBot(opts: TelegramBotOptions) {
return skipped;
};
const rawUpdateLogger = createSubsystemLogger("gateway/channels/telegram/raw-update");
const MAX_RAW_UPDATE_CHARS = 8000;
const MAX_RAW_UPDATE_STRING = 500;
const MAX_RAW_UPDATE_ARRAY = 20;
const stringifyUpdate = (update: unknown) => {
const seen = new WeakSet<object>();
return JSON.stringify(update ?? null, (key, value) => {
if (typeof value === "string" && value.length > MAX_RAW_UPDATE_STRING) {
return `${value.slice(0, MAX_RAW_UPDATE_STRING)}...`;
}
if (Array.isArray(value) && value.length > MAX_RAW_UPDATE_ARRAY) {
return [
...value.slice(0, MAX_RAW_UPDATE_ARRAY),
`...(${value.length - MAX_RAW_UPDATE_ARRAY} more)`,
];
}
if (value && typeof value === "object") {
const obj = value as object;
if (seen.has(obj)) return "[Circular]";
seen.add(obj);
}
return value;
});
};
bot.use(async (ctx, next) => {
if (shouldLogVerbose()) {
try {
const raw = stringifyUpdate(ctx.update);
const preview =
raw.length > MAX_RAW_UPDATE_CHARS ? `${raw.slice(0, MAX_RAW_UPDATE_CHARS)}...` : raw;
rawUpdateLogger.debug(`telegram update: ${preview}`);
} catch (err) {
rawUpdateLogger.debug(`telegram update log failed: ${String(err)}`);
}
}
await next();
recordUpdateId(ctx);
});
@@ -372,13 +409,20 @@ export function createTelegramBot(opts: TelegramBotOptions) {
accountId: account.accountId,
peer: { kind: isGroup ? "group" : "dm", id: peerId },
});
const baseSessionKey = route.sessionKey;
const dmThreadId = !isGroup ? resolvedThreadId : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) })
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
// Enqueue system event for each added reaction
for (const r of addedReactions) {
const emoji = r.emoji;
const text = `Telegram reaction added: ${emoji} by ${senderLabel} on msg ${messageId}`;
enqueueSystemEvent(text, {
sessionKey: route.sessionKey,
sessionKey: sessionKey,
contextKey: `telegram:reaction:add:${chatId}:${messageId}:${user?.id ?? "anon"}:${emoji}`,
});
logVerbose(`telegram: reaction event enqueued: ${text}`);