feat(telegram): wire replyToMode config, add forum topic support, fix messaging tool duplicates
Changes: - Default replyToMode from "off" to "first" for better threading UX - Add messageThreadId and replyToMessageId params for forum topic support - Add messaging tool duplicate detection to suppress redundant block replies - Add sendMessage action to telegram tool schema - Add @grammyjs/types devDependency for proper TypeScript typing - Remove @ts-nocheck and fix all type errors in send.ts - Add comprehensive docs/telegram.md documentation - Add PR-326-REVIEW.md with John Carmack-level code review Test coverage: - normalizeTextForComparison: 5 cases - isMessagingToolDuplicate: 7 cases - sendMessageTelegram thread params: 5 cases - handleTelegramAction sendMessage: 4 cases - Forum topic isolation: 4 cases 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
committed by
Peter Steinberger
parent
6cd32ec7f6
commit
33e2d53be3
@@ -8,6 +8,7 @@ import { createSubsystemLogger } from "../logging.js";
|
||||
import { splitMediaFromOutput } from "../media/parse.js";
|
||||
import type { BlockReplyChunking } from "./pi-embedded-block-chunker.js";
|
||||
import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
|
||||
import { isMessagingToolDuplicate } from "./pi-embedded-helpers.js";
|
||||
import {
|
||||
extractAssistantText,
|
||||
extractAssistantThinking,
|
||||
@@ -137,6 +138,21 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
let compactionRetryPromise: Promise<void> | null = null;
|
||||
let lastReasoningSent: string | undefined;
|
||||
|
||||
// ── Messaging tool duplicate detection ──────────────────────────────────────
|
||||
// Track texts sent via messaging tools to suppress duplicate block replies.
|
||||
// Only committed (successful) texts are checked - pending texts are tracked
|
||||
// to support commit logic but not used for suppression (avoiding lost messages on tool failure).
|
||||
// These tools can send messages via sendMessage/threadReply actions (or sessions_send with message).
|
||||
const MESSAGING_TOOLS = new Set([
|
||||
"telegram",
|
||||
"whatsapp",
|
||||
"discord",
|
||||
"slack",
|
||||
"sessions_send",
|
||||
]);
|
||||
const messagingToolSentTexts: string[] = [];
|
||||
const pendingMessagingTexts = new Map<string, string>();
|
||||
|
||||
const ensureCompactionPromise = () => {
|
||||
if (!compactionRetryPromise) {
|
||||
compactionRetryPromise = new Promise((resolve) => {
|
||||
@@ -221,6 +237,16 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
const chunk = strippedText.trimEnd();
|
||||
if (!chunk) return;
|
||||
if (chunk === lastBlockReplyText) return;
|
||||
|
||||
// Only check committed (successful) messaging tool texts - checking pending texts
|
||||
// is risky because if the tool fails after suppression, the user gets no response
|
||||
if (isMessagingToolDuplicate(chunk, messagingToolSentTexts)) {
|
||||
log.debug(
|
||||
`Skipping block reply - already sent via messaging tool: ${chunk.slice(0, 50)}...`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
lastBlockReplyText = chunk;
|
||||
assistantTexts.push(chunk);
|
||||
if (!params.onBlockReply) return;
|
||||
@@ -288,6 +314,8 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
toolMetas.length = 0;
|
||||
toolMetaById.clear();
|
||||
toolSummaryById.clear();
|
||||
messagingToolSentTexts.length = 0;
|
||||
pendingMessagingTexts.clear();
|
||||
deltaBuffer = "";
|
||||
blockBuffer = "";
|
||||
blockChunker?.reset();
|
||||
@@ -355,6 +383,32 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
toolSummaryById.add(toolCallId);
|
||||
emitToolSummary(toolName, meta);
|
||||
}
|
||||
|
||||
// Track messaging tool sends (pending until confirmed in tool_execution_end)
|
||||
if (MESSAGING_TOOLS.has(toolName)) {
|
||||
const argsRecord =
|
||||
args && typeof args === "object"
|
||||
? (args as Record<string, unknown>)
|
||||
: {};
|
||||
const action =
|
||||
typeof argsRecord.action === "string" ? argsRecord.action : "";
|
||||
// Track send actions: sendMessage/threadReply for Discord/Slack, or sessions_send (no action field)
|
||||
if (
|
||||
action === "sendMessage" ||
|
||||
action === "threadReply" ||
|
||||
toolName === "sessions_send"
|
||||
) {
|
||||
// Field names vary by tool: Discord/Slack use "content", sessions_send uses "message"
|
||||
const text =
|
||||
(argsRecord.content as string) ?? (argsRecord.message as string);
|
||||
if (text && typeof text === "string") {
|
||||
pendingMessagingTexts.set(toolCallId, text);
|
||||
log.debug(
|
||||
`Tracking pending messaging text: tool=${toolName} action=${action} len=${text.length}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (evt.type === "tool_execution_update") {
|
||||
@@ -404,6 +458,18 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
toolMetaById.delete(toolCallId);
|
||||
toolSummaryById.delete(toolCallId);
|
||||
|
||||
// Commit messaging tool text on success, discard on error
|
||||
const pendingText = pendingMessagingTexts.get(toolCallId);
|
||||
if (pendingText) {
|
||||
pendingMessagingTexts.delete(toolCallId);
|
||||
if (!isError) {
|
||||
messagingToolSentTexts.push(pendingText);
|
||||
log.debug(
|
||||
`Committed messaging text: tool=${toolName} len=${pendingText.length}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
emitAgentEvent({
|
||||
runId: params.runId,
|
||||
stream: "tool",
|
||||
@@ -591,14 +657,21 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
blockChunker.drain({ force: true, emit: emitBlockChunk });
|
||||
blockChunker.reset();
|
||||
} else if (text !== lastBlockReplyText) {
|
||||
lastBlockReplyText = text;
|
||||
const { text: cleanedText, mediaUrls } =
|
||||
splitMediaFromOutput(text);
|
||||
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
|
||||
void params.onBlockReply({
|
||||
text: cleanedText,
|
||||
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
||||
});
|
||||
// Check for duplicates before emitting (same logic as emitBlockChunk)
|
||||
if (isMessagingToolDuplicate(text, messagingToolSentTexts)) {
|
||||
log.debug(
|
||||
`Skipping message_end block reply - already sent via messaging tool: ${text.slice(0, 50)}...`,
|
||||
);
|
||||
} else {
|
||||
lastBlockReplyText = text;
|
||||
const { text: cleanedText, mediaUrls } =
|
||||
splitMediaFromOutput(text);
|
||||
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
|
||||
void params.onBlockReply({
|
||||
text: cleanedText,
|
||||
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -705,6 +778,10 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
toolMetas,
|
||||
unsubscribe,
|
||||
isCompacting: () => compactionInFlight || pendingCompactionRetry > 0,
|
||||
// Returns true if any messaging tool successfully sent a message.
|
||||
// Used to suppress agent's confirmation text (e.g., "Respondi no Telegram!")
|
||||
// which is generated AFTER the tool sends the actual answer.
|
||||
didSendViaMessagingTool: () => messagingToolSentTexts.length > 0,
|
||||
waitForCompactionRetry: () => {
|
||||
if (compactionInFlight || pendingCompactionRetry > 0) {
|
||||
ensureCompactionPromise();
|
||||
|
||||
Reference in New Issue
Block a user