diff --git a/src/agents/pi-embedded-helpers.ts b/src/agents/pi-embedded-helpers.ts index 1fdf46f84..cae4ba51f 100644 --- a/src/agents/pi-embedded-helpers.ts +++ b/src/agents/pi-embedded-helpers.ts @@ -517,7 +517,7 @@ export function validateGeminiTurns(messages: AgentMessage[]): AgentMessage[] { } // ── Messaging tool duplicate detection ────────────────────────────────────── -// When the agent uses a messaging tool (telegram, discord, slack, sessions_send) +// When the agent uses a messaging tool (telegram, discord, slack, message, sessions_send) // to send a message, we track the text so we can suppress duplicate block replies. // The LLM sometimes elaborates or wraps the same content, so we use substring matching. @@ -539,6 +539,22 @@ export function normalizeTextForComparison(text: string): string { .trim(); } +export function isMessagingToolDuplicateNormalized( + normalized: string, + normalizedSentTexts: string[], +): boolean { + if (normalizedSentTexts.length === 0) return false; + if (!normalized || normalized.length < MIN_DUPLICATE_TEXT_LENGTH) + return false; + return normalizedSentTexts.some((normalizedSent) => { + if (!normalizedSent || normalizedSent.length < MIN_DUPLICATE_TEXT_LENGTH) + return false; + return ( + normalized.includes(normalizedSent) || normalizedSent.includes(normalized) + ); + }); +} + /** * Check if a text is a duplicate of any previously sent messaging tool text. * Uses substring matching to handle LLM elaboration (e.g., wrapping in quotes, @@ -577,13 +593,8 @@ export function isMessagingToolDuplicate( const normalized = normalizeTextForComparison(text); if (!normalized || normalized.length < MIN_DUPLICATE_TEXT_LENGTH) return false; - return sentTexts.some((sent) => { - const normalizedSent = normalizeTextForComparison(sent); - if (!normalizedSent || normalizedSent.length < MIN_DUPLICATE_TEXT_LENGTH) - return false; - // Substring match: either text contains the other - return ( - normalized.includes(normalizedSent) || normalizedSent.includes(normalized) - ); - }); + return isMessagingToolDuplicateNormalized( + normalized, + sentTexts.map(normalizeTextForComparison), + ); } diff --git a/src/agents/pi-embedded-messaging.ts b/src/agents/pi-embedded-messaging.ts new file mode 100644 index 000000000..87d2adaae --- /dev/null +++ b/src/agents/pi-embedded-messaging.ts @@ -0,0 +1,120 @@ +export type MessagingToolSend = { + tool: string; + provider: string; + accountId?: string; + to?: string; +}; + +const MESSAGING_TOOLS = new Set([ + "telegram", + "whatsapp", + "discord", + "slack", + "sessions_send", + "message", +]); + +export function isMessagingTool(toolName: string): boolean { + return MESSAGING_TOOLS.has(toolName); +} + +export function isMessagingToolSendAction( + toolName: string, + actionRaw: string, +): boolean { + const action = actionRaw.trim(); + if (toolName === "sessions_send") return true; + if (toolName === "message") { + return action === "send" || action === "thread-reply"; + } + return action === "sendMessage" || action === "threadReply"; +} + +function normalizeSlackTarget(raw: string): string | undefined { + const trimmed = raw.trim(); + if (!trimmed) return undefined; + const mentionMatch = trimmed.match(/^<@([A-Z0-9]+)>$/i); + if (mentionMatch) return `user:${mentionMatch[1]}`.toLowerCase(); + if (trimmed.startsWith("user:")) { + const id = trimmed.slice(5).trim(); + return id ? `user:${id}`.toLowerCase() : undefined; + } + if (trimmed.startsWith("channel:")) { + const id = trimmed.slice(8).trim(); + return id ? `channel:${id}`.toLowerCase() : undefined; + } + if (trimmed.startsWith("slack:")) { + const id = trimmed.slice(6).trim(); + return id ? `user:${id}`.toLowerCase() : undefined; + } + if (trimmed.startsWith("@")) { + const id = trimmed.slice(1).trim(); + return id ? `user:${id}`.toLowerCase() : undefined; + } + if (trimmed.startsWith("#")) { + const id = trimmed.slice(1).trim(); + return id ? `channel:${id}`.toLowerCase() : undefined; + } + return `channel:${trimmed}`.toLowerCase(); +} + +function normalizeDiscordTarget(raw: string): string | undefined { + const trimmed = raw.trim(); + if (!trimmed) return undefined; + const mentionMatch = trimmed.match(/^<@!?(\d+)>$/); + if (mentionMatch) return `user:${mentionMatch[1]}`.toLowerCase(); + if (trimmed.startsWith("user:")) { + const id = trimmed.slice(5).trim(); + return id ? `user:${id}`.toLowerCase() : undefined; + } + if (trimmed.startsWith("channel:")) { + const id = trimmed.slice(8).trim(); + return id ? `channel:${id}`.toLowerCase() : undefined; + } + if (trimmed.startsWith("discord:")) { + const id = trimmed.slice(8).trim(); + return id ? `user:${id}`.toLowerCase() : undefined; + } + if (trimmed.startsWith("@")) { + const id = trimmed.slice(1).trim(); + return id ? `user:${id}`.toLowerCase() : undefined; + } + return `channel:${trimmed}`.toLowerCase(); +} + +function normalizeTelegramTarget(raw: string): string | undefined { + const trimmed = raw.trim(); + if (!trimmed) return undefined; + let normalized = trimmed; + if (normalized.startsWith("telegram:")) { + normalized = normalized.slice("telegram:".length).trim(); + } else if (normalized.startsWith("tg:")) { + normalized = normalized.slice("tg:".length).trim(); + } else if (normalized.startsWith("group:")) { + normalized = normalized.slice("group:".length).trim(); + } + if (!normalized) return undefined; + const tmeMatch = + /^https?:\/\/t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized) ?? + /^t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized); + if (tmeMatch?.[1]) normalized = `@${tmeMatch[1]}`; + if (!normalized) return undefined; + return `telegram:${normalized}`.toLowerCase(); +} + +export function normalizeTargetForProvider( + provider: string, + raw?: string, +): string | undefined { + if (!raw) return undefined; + switch (provider.trim().toLowerCase()) { + case "slack": + return normalizeSlackTarget(raw); + case "discord": + return normalizeDiscordTarget(raw); + case "telegram": + return normalizeTelegramTarget(raw); + default: + return raw.trim().toLowerCase() || undefined; + } +} diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 8913b0f06..d12df2b48 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -62,6 +62,10 @@ import { resolveModelAuthMode, } from "./model-auth.js"; import { ensureClawdbotModelsJson } from "./models-config.js"; +import type { MessagingToolSend } from "./pi-embedded-messaging.js"; + +export type { MessagingToolSend } from "./pi-embedded-messaging.js"; + import { buildBootstrapContextFiles, classifyFailoverReason, @@ -266,13 +270,6 @@ type ApiKeyInfo = { source: string; }; -export type MessagingToolSend = { - tool: string; - provider: string; - accountId?: string; - to?: string; -}; - export type EmbeddedPiRunResult = { payloads?: Array<{ text?: string; diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 50b330d3b..4b582adab 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -12,7 +12,16 @@ import { createSubsystemLogger } from "../logging.js"; import { truncateUtf16Safe } from "../utils.js"; import type { BlockReplyChunking } from "./pi-embedded-block-chunker.js"; import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js"; -import { isMessagingToolDuplicate } from "./pi-embedded-helpers.js"; +import { + isMessagingToolDuplicateNormalized, + normalizeTextForComparison, +} from "./pi-embedded-helpers.js"; +import { + isMessagingTool, + isMessagingToolSendAction, + type MessagingToolSend, + normalizeTargetForProvider, +} from "./pi-embedded-messaging.js"; import { extractAssistantText, extractAssistantThinking, @@ -57,13 +66,6 @@ const appendRawStream = (payload: Record) => { export type { BlockReplyChunking } from "./pi-embedded-block-chunker.js"; -type MessagingToolSend = { - tool: string; - provider: string; - accountId?: string; - to?: string; -}; - function truncateToolText(text: string): string { if (text.length <= TOOL_RESULT_MAX_CHARS) return text; return `${truncateUtf16Safe(text, TOOL_RESULT_MAX_CHARS)}\n…(truncated)…`; @@ -124,78 +126,6 @@ function stripUnpairedThinkingTags(text: string): string { return text; } -function normalizeSlackTarget(raw: string): string | undefined { - const trimmed = raw.trim(); - if (!trimmed) return undefined; - const mentionMatch = trimmed.match(/^<@([A-Z0-9]+)>$/i); - if (mentionMatch) return `user:${mentionMatch[1]}`; - if (trimmed.startsWith("user:")) { - const id = trimmed.slice(5).trim(); - return id ? `user:${id}` : undefined; - } - if (trimmed.startsWith("channel:")) { - const id = trimmed.slice(8).trim(); - return id ? `channel:${id}` : undefined; - } - if (trimmed.startsWith("slack:")) { - const id = trimmed.slice(6).trim(); - return id ? `user:${id}` : undefined; - } - if (trimmed.startsWith("@")) { - const id = trimmed.slice(1).trim(); - return id ? `user:${id}` : undefined; - } - if (trimmed.startsWith("#")) { - const id = trimmed.slice(1).trim(); - return id ? `channel:${id}` : undefined; - } - return `channel:${trimmed}`; -} - -function normalizeDiscordTarget(raw: string): string | undefined { - const trimmed = raw.trim(); - if (!trimmed) return undefined; - const mentionMatch = trimmed.match(/^<@!?(\d+)>$/); - if (mentionMatch) return `user:${mentionMatch[1]}`; - if (trimmed.startsWith("user:")) { - const id = trimmed.slice(5).trim(); - return id ? `user:${id}` : undefined; - } - if (trimmed.startsWith("channel:")) { - const id = trimmed.slice(8).trim(); - return id ? `channel:${id}` : undefined; - } - if (trimmed.startsWith("discord:")) { - const id = trimmed.slice(8).trim(); - return id ? `user:${id}` : undefined; - } - if (trimmed.startsWith("@")) { - const id = trimmed.slice(1).trim(); - return id ? `user:${id}` : undefined; - } - return `channel:${trimmed}`; -} - -function normalizeTelegramTarget(raw: string): string | undefined { - const trimmed = raw.trim(); - if (!trimmed) return undefined; - let normalized = trimmed; - if (normalized.startsWith("telegram:")) { - normalized = normalized.slice("telegram:".length).trim(); - } else if (normalized.startsWith("tg:")) { - normalized = normalized.slice("tg:".length).trim(); - } else if (normalized.startsWith("group:")) { - normalized = normalized.slice("group:".length).trim(); - } - if (!normalized) return undefined; - const tmeMatch = - /^https?:\/\/t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized) ?? - /^t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized); - if (tmeMatch?.[1]) normalized = `@${tmeMatch[1]}`; - if (!normalized) return undefined; - return `telegram:${normalized}`; -} - function extractMessagingToolSend( toolName: string, args: Record, @@ -208,7 +138,7 @@ function extractMessagingToolSend( if (action !== "sendMessage") return undefined; const toRaw = typeof args.to === "string" ? args.to : undefined; if (!toRaw) return undefined; - const to = normalizeSlackTarget(toRaw); + const to = normalizeTargetForProvider("slack", toRaw); return to ? { tool: toolName, provider: "slack", accountId, to } : undefined; @@ -217,7 +147,7 @@ function extractMessagingToolSend( if (action === "sendMessage") { const toRaw = typeof args.to === "string" ? args.to : undefined; if (!toRaw) return undefined; - const to = normalizeDiscordTarget(toRaw); + const to = normalizeTargetForProvider("discord", toRaw); return to ? { tool: toolName, provider: "discord", accountId, to } : undefined; @@ -226,7 +156,7 @@ function extractMessagingToolSend( const channelId = typeof args.channelId === "string" ? args.channelId.trim() : ""; if (!channelId) return undefined; - const to = normalizeDiscordTarget(`channel:${channelId}`); + const to = normalizeTargetForProvider("discord", `channel:${channelId}`); return to ? { tool: toolName, provider: "discord", accountId, to } : undefined; @@ -237,7 +167,7 @@ function extractMessagingToolSend( if (action !== "sendMessage") return undefined; const toRaw = typeof args.to === "string" ? args.to : undefined; if (!toRaw) return undefined; - const to = normalizeTelegramTarget(toRaw); + const to = normalizeTargetForProvider("telegram", toRaw); return to ? { tool: toolName, provider: "telegram", accountId, to } : undefined; @@ -248,8 +178,8 @@ function extractMessagingToolSend( if (!toRaw) return undefined; const providerRaw = typeof args.provider === "string" ? args.provider.trim() : ""; - const provider = providerRaw || "message"; - const to = toRaw.trim(); + const provider = providerRaw ? providerRaw.toLowerCase() : "message"; + const to = normalizeTargetForProvider(provider, toRaw); return to ? { tool: toolName, provider, accountId, to } : undefined; } return undefined; @@ -316,18 +246,25 @@ export function subscribeEmbeddedPiSession(params: { // Only committed (successful) texts are checked - pending texts are tracked // to support commit logic but not used for suppression (avoiding lost messages on tool failure). // These tools can send messages via sendMessage/threadReply actions (or sessions_send with message). - const MESSAGING_TOOLS = new Set([ - "telegram", - "whatsapp", - "discord", - "slack", - "message", - "sessions_send", - ]); + const MAX_MESSAGING_SENT_TEXTS = 200; + const MAX_MESSAGING_SENT_TARGETS = 200; const messagingToolSentTexts: string[] = []; + const messagingToolSentTextsNormalized: string[] = []; const messagingToolSentTargets: MessagingToolSend[] = []; const pendingMessagingTexts = new Map(); const pendingMessagingTargets = new Map(); + const trimMessagingToolSent = () => { + if (messagingToolSentTexts.length > MAX_MESSAGING_SENT_TEXTS) { + const overflow = messagingToolSentTexts.length - MAX_MESSAGING_SENT_TEXTS; + messagingToolSentTexts.splice(0, overflow); + messagingToolSentTextsNormalized.splice(0, overflow); + } + if (messagingToolSentTargets.length > MAX_MESSAGING_SENT_TARGETS) { + const overflow = + messagingToolSentTargets.length - MAX_MESSAGING_SENT_TARGETS; + messagingToolSentTargets.splice(0, overflow); + } + }; const ensureCompactionPromise = () => { if (!compactionRetryPromise) { @@ -440,7 +377,13 @@ export function subscribeEmbeddedPiSession(params: { // Only check committed (successful) messaging tool texts - checking pending texts // is risky because if the tool fails after suppression, the user gets no response - if (isMessagingToolDuplicate(chunk, messagingToolSentTexts)) { + const normalizedChunk = normalizeTextForComparison(chunk); + if ( + isMessagingToolDuplicateNormalized( + normalizedChunk, + messagingToolSentTextsNormalized, + ) + ) { log.debug( `Skipping block reply - already sent via messaging tool: ${chunk.slice(0, 50)}...`, ); @@ -479,6 +422,7 @@ export function subscribeEmbeddedPiSession(params: { toolMetaById.clear(); toolSummaryById.clear(); messagingToolSentTexts.length = 0; + messagingToolSentTextsNormalized.length = 0; messagingToolSentTargets.length = 0; pendingMessagingTexts.clear(); pendingMessagingTargets.clear(); @@ -556,7 +500,7 @@ export function subscribeEmbeddedPiSession(params: { } // Track messaging tool sends (pending until confirmed in tool_execution_end) - if (MESSAGING_TOOLS.has(toolName)) { + if (isMessagingTool(toolName)) { const argsRecord = args && typeof args === "object" ? (args as Record) @@ -565,14 +509,7 @@ export function subscribeEmbeddedPiSession(params: { typeof argsRecord.action === "string" ? argsRecord.action.trim() : ""; - // Track send actions: sendMessage/threadReply for Discord/Slack, sessions_send (no action field), - // and message/send or message/thread-reply for the generic message tool. - const isMessagingSend = - action === "sendMessage" || - action === "threadReply" || - toolName === "sessions_send" || - (toolName === "message" && - (action === "send" || action === "thread-reply")); + const isMessagingSend = isMessagingToolSendAction(toolName, action); if (isMessagingSend) { const sendTarget = extractMessagingToolSend(toolName, argsRecord); if (sendTarget) { @@ -645,15 +582,20 @@ export function subscribeEmbeddedPiSession(params: { pendingMessagingTexts.delete(toolCallId); if (!isError) { messagingToolSentTexts.push(pendingText); + messagingToolSentTextsNormalized.push( + normalizeTextForComparison(pendingText), + ); log.debug( `Committed messaging text: tool=${toolName} len=${pendingText.length}`, ); + trimMessagingToolSent(); } } if (pendingTarget) { pendingMessagingTargets.delete(toolCallId); if (!isError) { messagingToolSentTargets.push(pendingTarget); + trimMessagingToolSent(); } } @@ -892,7 +834,13 @@ export function subscribeEmbeddedPiSession(params: { blockChunker.reset(); } else if (text !== lastBlockReplyText) { // Check for duplicates before emitting (same logic as emitBlockChunk) - if (isMessagingToolDuplicate(text, messagingToolSentTexts)) { + const normalizedText = normalizeTextForComparison(text); + if ( + isMessagingToolDuplicateNormalized( + normalizedText, + messagingToolSentTextsNormalized, + ) + ) { log.debug( `Skipping message_end block reply - already sent via messaging tool: ${text.slice(0, 50)}...`, ); diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts index e1f1707dd..54b3db58c 100644 --- a/src/auto-reply/reply/reply-payloads.ts +++ b/src/auto-reply/reply/reply-payloads.ts @@ -1,4 +1,5 @@ import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js"; +import { normalizeTargetForProvider } from "../../agents/pi-embedded-messaging.js"; import type { MessagingToolSend } from "../../agents/pi-embedded-runner.js"; import type { ReplyToMode } from "../../config/types.js"; import type { OriginatingChannelType } from "../templating.js"; @@ -75,95 +76,6 @@ export function filterMessagingToolDuplicates(params: { ); } -function normalizeSlackTarget(raw: string): string | undefined { - const trimmed = raw.trim(); - if (!trimmed) return undefined; - const mentionMatch = trimmed.match(/^<@([A-Z0-9]+)>$/i); - if (mentionMatch) return `user:${mentionMatch[1]}`.toLowerCase(); - if (trimmed.startsWith("user:")) { - const id = trimmed.slice(5).trim(); - return id ? `user:${id}`.toLowerCase() : undefined; - } - if (trimmed.startsWith("channel:")) { - const id = trimmed.slice(8).trim(); - return id ? `channel:${id}`.toLowerCase() : undefined; - } - if (trimmed.startsWith("slack:")) { - const id = trimmed.slice(6).trim(); - return id ? `user:${id}`.toLowerCase() : undefined; - } - if (trimmed.startsWith("@")) { - const id = trimmed.slice(1).trim(); - return id ? `user:${id}`.toLowerCase() : undefined; - } - if (trimmed.startsWith("#")) { - const id = trimmed.slice(1).trim(); - return id ? `channel:${id}`.toLowerCase() : undefined; - } - return `channel:${trimmed}`.toLowerCase(); -} - -function normalizeDiscordTarget(raw: string): string | undefined { - const trimmed = raw.trim(); - if (!trimmed) return undefined; - const mentionMatch = trimmed.match(/^<@!?(\d+)>$/); - if (mentionMatch) return `user:${mentionMatch[1]}`.toLowerCase(); - if (trimmed.startsWith("user:")) { - const id = trimmed.slice(5).trim(); - return id ? `user:${id}`.toLowerCase() : undefined; - } - if (trimmed.startsWith("channel:")) { - const id = trimmed.slice(8).trim(); - return id ? `channel:${id}`.toLowerCase() : undefined; - } - if (trimmed.startsWith("discord:")) { - const id = trimmed.slice(8).trim(); - return id ? `user:${id}`.toLowerCase() : undefined; - } - if (trimmed.startsWith("@")) { - const id = trimmed.slice(1).trim(); - return id ? `user:${id}`.toLowerCase() : undefined; - } - return `channel:${trimmed}`.toLowerCase(); -} - -function normalizeTelegramTarget(raw: string): string | undefined { - const trimmed = raw.trim(); - if (!trimmed) return undefined; - let normalized = trimmed; - if (normalized.startsWith("telegram:")) { - normalized = normalized.slice("telegram:".length).trim(); - } else if (normalized.startsWith("tg:")) { - normalized = normalized.slice("tg:".length).trim(); - } else if (normalized.startsWith("group:")) { - normalized = normalized.slice("group:".length).trim(); - } - if (!normalized) return undefined; - const tmeMatch = - /^https?:\/\/t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized) ?? - /^t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized); - if (tmeMatch?.[1]) normalized = `@${tmeMatch[1]}`; - if (!normalized) return undefined; - return `telegram:${normalized}`.toLowerCase(); -} - -function normalizeTargetForProvider( - provider: string, - raw?: string, -): string | undefined { - if (!raw) return undefined; - switch (provider) { - case "slack": - return normalizeSlackTarget(raw); - case "discord": - return normalizeDiscordTarget(raw); - case "telegram": - return normalizeTelegramTarget(raw); - default: - return raw.trim().toLowerCase() || undefined; - } -} - function normalizeAccountId(value?: string): string | undefined { const trimmed = value?.trim(); return trimmed ? trimmed.toLowerCase() : undefined;