refactor(telegram): polish topic threading

This commit is contained in:
Peter Steinberger
2026-01-07 02:16:38 +00:00
parent 80112433a5
commit d7bc5b58fc
3 changed files with 104 additions and 36 deletions

View File

@@ -27,6 +27,7 @@ All session state is **owned by the gateway** (the “master” Clawdbot). UI cl
- Group chats isolate state: `agent:<agentId>:<provider>:group:<id>` (rooms/channels use `agent:<agentId>:<provider>:channel:<id>`). - Group chats isolate state: `agent:<agentId>:<provider>:group:<id>` (rooms/channels use `agent:<agentId>:<provider>:channel:<id>`).
- Telegram forum topics append `:topic:<threadId>` to the group id for isolation. - Telegram forum topics append `:topic:<threadId>` to the group id for isolation.
- Legacy `group:<id>` keys are still recognized for migration. - Legacy `group:<id>` keys are still recognized for migration.
- Inbound contexts may still use `group:<id>`; the provider is inferred from `Provider` and normalized to the canonical `agent:<agentId>:<provider>:group:<id>` form.
- Other sources: - Other sources:
- Cron jobs: `cron:<job.id>` - Cron jobs: `cron:<job.id>`
- Webhooks: `hook:<uuid>` (unless explicitly set by the hook) - Webhooks: `hook:<uuid>` (unless explicitly set by the hook)

View File

@@ -39,6 +39,7 @@ vi.mock("./pairing-store.js", () => ({
const useSpy = vi.fn(); const useSpy = vi.fn();
const onSpy = vi.fn(); const onSpy = vi.fn();
const stopSpy = vi.fn(); const stopSpy = vi.fn();
const commandSpy = vi.fn();
const sendChatActionSpy = vi.fn(); const sendChatActionSpy = vi.fn();
const setMessageReactionSpy = vi.fn(async () => undefined); const setMessageReactionSpy = vi.fn(async () => undefined);
const setMyCommandsSpy = vi.fn(async () => undefined); const setMyCommandsSpy = vi.fn(async () => undefined);
@@ -69,6 +70,7 @@ vi.mock("grammy", () => ({
api = apiStub; api = apiStub;
on = onSpy; on = onSpy;
stop = stopSpy; stop = stopSpy;
command = commandSpy;
constructor(public token: string) {} constructor(public token: string) {}
}, },
InputFile: class {}, InputFile: class {},
@@ -1367,6 +1369,7 @@ describe("createTelegramBot", () => {
it("passes message_thread_id to topic replies", async () => { it("passes message_thread_id to topic replies", async () => {
onSpy.mockReset(); onSpy.mockReset();
sendMessageSpy.mockReset(); sendMessageSpy.mockReset();
commandSpy.mockReset();
const replySpy = replyModule.__replySpy as unknown as ReturnType< const replySpy = replyModule.__replySpy as unknown as ReturnType<
typeof vi.fn typeof vi.fn
>; >;
@@ -1406,4 +1409,53 @@ describe("createTelegramBot", () => {
expect.objectContaining({ message_thread_id: 99 }), expect.objectContaining({ message_thread_id: 99 }),
); );
}); });
it("threads native command replies inside topics", async () => {
onSpy.mockReset();
sendMessageSpy.mockReset();
commandSpy.mockReset();
const replySpy = replyModule.__replySpy as unknown as ReturnType<
typeof vi.fn
>;
replySpy.mockReset();
replySpy.mockResolvedValue({ text: "response" });
loadConfig.mockReturnValue({
commands: { native: true },
telegram: {
dmPolicy: "open",
allowFrom: ["*"],
groups: { "*": { requireMention: false } },
},
});
createTelegramBot({ token: "tok" });
expect(commandSpy).toHaveBeenCalled();
const handler = commandSpy.mock.calls[0][1] as (
ctx: Record<string, unknown>,
) => Promise<void>;
await handler({
message: {
chat: {
id: -1001234567890,
type: "supergroup",
title: "Forum Group",
is_forum: true,
},
from: { id: 12345, username: "testuser" },
text: "/status",
date: 1736380800,
message_id: 42,
message_thread_id: 99,
},
match: "",
});
expect(sendMessageSpy).toHaveBeenCalledWith(
"-1001234567890",
expect.any(String),
expect.objectContaining({ message_thread_id: 99 }),
);
});
}); });

View File

@@ -212,9 +212,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
await bot.api.sendChatAction( await bot.api.sendChatAction(
chatId, chatId,
"typing", "typing",
messageThreadId != null buildTelegramThreadParams(messageThreadId),
? { message_thread_id: messageThreadId }
: undefined,
); );
} catch (err) { } catch (err) {
logVerbose( logVerbose(
@@ -396,18 +394,14 @@ export function createTelegramBot(opts: TelegramBotOptions) {
peer: { peer: {
kind: isGroup ? "group" : "dm", kind: isGroup ? "group" : "dm",
id: isGroup id: isGroup
? messageThreadId != null ? buildTelegramGroupPeerId(chatId, messageThreadId)
? `${chatId}:topic:${messageThreadId}`
: String(chatId)
: String(chatId), : String(chatId),
}, },
}); });
const ctxPayload = { const ctxPayload = {
Body: body, Body: body,
From: isGroup From: isGroup
? messageThreadId != null ? buildTelegramGroupFrom(chatId, messageThreadId)
? `group:${chatId}:topic:${messageThreadId}`
: `group:${chatId}`
: `telegram:${chatId}`, : `telegram:${chatId}`,
To: `telegram:${chatId}`, To: `telegram:${chatId}`,
SessionKey: route.sessionKey, SessionKey: route.sessionKey,
@@ -601,18 +595,14 @@ export function createTelegramBot(opts: TelegramBotOptions) {
peer: { peer: {
kind: isGroup ? "group" : "dm", kind: isGroup ? "group" : "dm",
id: isGroup id: isGroup
? messageThreadId != null ? buildTelegramGroupPeerId(chatId, messageThreadId)
? `${chatId}:topic:${messageThreadId}`
: String(chatId)
: String(chatId), : String(chatId),
}, },
}); });
const ctxPayload = { const ctxPayload = {
Body: prompt, Body: prompt,
From: isGroup From: isGroup
? messageThreadId != null ? buildTelegramGroupFrom(chatId, messageThreadId)
? `group:${chatId}:topic:${messageThreadId}`
: `group:${chatId}`
: `telegram:${chatId}`, : `telegram:${chatId}`,
To: `slash:${senderId || chatId}`, To: `slash:${senderId || chatId}`,
ChatType: isGroup ? "group" : "direct", ChatType: isGroup ? "group" : "direct",
@@ -840,6 +830,7 @@ async function deliverReplies(params: {
textLimit, textLimit,
messageThreadId, messageThreadId,
} = params; } = params;
const threadParams = buildTelegramThreadParams(messageThreadId);
let hasReplied = false; let hasReplied = false;
for (const reply of replies) { for (const reply of replies) {
if (!reply?.text && !reply?.mediaUrl && !(reply?.mediaUrls?.length ?? 0)) { if (!reply?.text && !reply?.mediaUrl && !(reply?.mediaUrls?.length ?? 0)) {
@@ -887,37 +878,32 @@ async function deliverReplies(params: {
replyToId && (replyToMode === "all" || !hasReplied) replyToId && (replyToMode === "all" || !hasReplied)
? replyToId ? replyToId
: undefined; : undefined;
const threadParams = const mediaParams: Record<string, unknown> = {
messageThreadId != null ? { message_thread_id: messageThreadId } : {};
if (isGif) {
await bot.api.sendAnimation(chatId, file, {
caption, caption,
reply_to_message_id: replyToMessageId, reply_to_message_id: replyToMessageId,
...threadParams, };
if (threadParams) {
mediaParams.message_thread_id = threadParams.message_thread_id;
}
if (isGif) {
await bot.api.sendAnimation(chatId, file, {
...mediaParams,
}); });
} else if (kind === "image") { } else if (kind === "image") {
await bot.api.sendPhoto(chatId, file, { await bot.api.sendPhoto(chatId, file, {
caption, ...mediaParams,
reply_to_message_id: replyToMessageId,
...threadParams,
}); });
} else if (kind === "video") { } else if (kind === "video") {
await bot.api.sendVideo(chatId, file, { await bot.api.sendVideo(chatId, file, {
caption, ...mediaParams,
reply_to_message_id: replyToMessageId,
...threadParams,
}); });
} else if (kind === "audio") { } else if (kind === "audio") {
await bot.api.sendAudio(chatId, file, { await bot.api.sendAudio(chatId, file, {
caption, ...mediaParams,
reply_to_message_id: replyToMessageId,
...threadParams,
}); });
} else { } else {
await bot.api.sendDocument(chatId, file, { await bot.api.sendDocument(chatId, file, {
caption, ...mediaParams,
reply_to_message_id: replyToMessageId,
...threadParams,
}); });
} }
if (replyToId && !hasReplied) { if (replyToId && !hasReplied) {
@@ -927,6 +913,30 @@ async function deliverReplies(params: {
} }
} }
function buildTelegramThreadParams(messageThreadId?: number) {
return messageThreadId != null
? { message_thread_id: messageThreadId }
: undefined;
}
function buildTelegramGroupPeerId(
chatId: number | string,
messageThreadId?: number,
) {
return messageThreadId != null
? `${chatId}:topic:${messageThreadId}`
: String(chatId);
}
function buildTelegramGroupFrom(
chatId: number | string,
messageThreadId?: number,
) {
return messageThreadId != null
? `group:${chatId}:topic:${messageThreadId}`
: `group:${chatId}`;
}
function buildSenderName(msg: TelegramMessage) { function buildSenderName(msg: TelegramMessage) {
const name = const name =
[msg.from?.first_name, msg.from?.last_name] [msg.from?.first_name, msg.from?.last_name]
@@ -1051,11 +1061,17 @@ async function sendTelegramText(
runtime: RuntimeEnv, runtime: RuntimeEnv,
opts?: { replyToMessageId?: number; messageThreadId?: number }, opts?: { replyToMessageId?: number; messageThreadId?: number },
): Promise<number | undefined> { ): Promise<number | undefined> {
const threadParams = buildTelegramThreadParams(opts?.messageThreadId);
const baseParams: Record<string, unknown> = {
reply_to_message_id: opts?.replyToMessageId,
};
if (threadParams) {
baseParams.message_thread_id = threadParams.message_thread_id;
}
try { try {
const res = await bot.api.sendMessage(chatId, text, { const res = await bot.api.sendMessage(chatId, text, {
parse_mode: "Markdown", parse_mode: "Markdown",
reply_to_message_id: opts?.replyToMessageId, ...baseParams,
message_thread_id: opts?.messageThreadId,
}); });
return res.message_id; return res.message_id;
} catch (err) { } catch (err) {
@@ -1065,8 +1081,7 @@ async function sendTelegramText(
`telegram markdown parse failed; retrying without formatting: ${errText}`, `telegram markdown parse failed; retrying without formatting: ${errText}`,
); );
const res = await bot.api.sendMessage(chatId, text, { const res = await bot.api.sendMessage(chatId, text, {
reply_to_message_id: opts?.replyToMessageId, ...baseParams,
message_thread_id: opts?.messageThreadId,
}); });
return res.message_id; return res.message_id;
} }