refactor: centralize messaging dedupe helpers

This commit is contained in:
Peter Steinberger
2026-01-10 16:02:47 +01:00
parent 99e9e506be
commit 70c1732dd1
5 changed files with 199 additions and 211 deletions

View File

@@ -517,7 +517,7 @@ export function validateGeminiTurns(messages: AgentMessage[]): AgentMessage[] {
}
// ── Messaging tool duplicate detection ──────────────────────────────────────
// When the agent uses a messaging tool (telegram, discord, slack, sessions_send)
// When the agent uses a messaging tool (telegram, discord, slack, message, sessions_send)
// to send a message, we track the text so we can suppress duplicate block replies.
// The LLM sometimes elaborates or wraps the same content, so we use substring matching.
@@ -539,6 +539,22 @@ export function normalizeTextForComparison(text: string): string {
.trim();
}
export function isMessagingToolDuplicateNormalized(
normalized: string,
normalizedSentTexts: string[],
): boolean {
if (normalizedSentTexts.length === 0) return false;
if (!normalized || normalized.length < MIN_DUPLICATE_TEXT_LENGTH)
return false;
return normalizedSentTexts.some((normalizedSent) => {
if (!normalizedSent || normalizedSent.length < MIN_DUPLICATE_TEXT_LENGTH)
return false;
return (
normalized.includes(normalizedSent) || normalizedSent.includes(normalized)
);
});
}
/**
* Check if a text is a duplicate of any previously sent messaging tool text.
* Uses substring matching to handle LLM elaboration (e.g., wrapping in quotes,
@@ -577,13 +593,8 @@ export function isMessagingToolDuplicate(
const normalized = normalizeTextForComparison(text);
if (!normalized || normalized.length < MIN_DUPLICATE_TEXT_LENGTH)
return false;
return sentTexts.some((sent) => {
const normalizedSent = normalizeTextForComparison(sent);
if (!normalizedSent || normalizedSent.length < MIN_DUPLICATE_TEXT_LENGTH)
return false;
// Substring match: either text contains the other
return (
normalized.includes(normalizedSent) || normalizedSent.includes(normalized)
);
});
return isMessagingToolDuplicateNormalized(
normalized,
sentTexts.map(normalizeTextForComparison),
);
}

View File

@@ -0,0 +1,120 @@
export type MessagingToolSend = {
tool: string;
provider: string;
accountId?: string;
to?: string;
};
const MESSAGING_TOOLS = new Set([
"telegram",
"whatsapp",
"discord",
"slack",
"sessions_send",
"message",
]);
export function isMessagingTool(toolName: string): boolean {
return MESSAGING_TOOLS.has(toolName);
}
export function isMessagingToolSendAction(
toolName: string,
actionRaw: string,
): boolean {
const action = actionRaw.trim();
if (toolName === "sessions_send") return true;
if (toolName === "message") {
return action === "send" || action === "thread-reply";
}
return action === "sendMessage" || action === "threadReply";
}
function normalizeSlackTarget(raw: string): string | undefined {
const trimmed = raw.trim();
if (!trimmed) return undefined;
const mentionMatch = trimmed.match(/^<@([A-Z0-9]+)>$/i);
if (mentionMatch) return `user:${mentionMatch[1]}`.toLowerCase();
if (trimmed.startsWith("user:")) {
const id = trimmed.slice(5).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("channel:")) {
const id = trimmed.slice(8).trim();
return id ? `channel:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("slack:")) {
const id = trimmed.slice(6).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("@")) {
const id = trimmed.slice(1).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("#")) {
const id = trimmed.slice(1).trim();
return id ? `channel:${id}`.toLowerCase() : undefined;
}
return `channel:${trimmed}`.toLowerCase();
}
function normalizeDiscordTarget(raw: string): string | undefined {
const trimmed = raw.trim();
if (!trimmed) return undefined;
const mentionMatch = trimmed.match(/^<@!?(\d+)>$/);
if (mentionMatch) return `user:${mentionMatch[1]}`.toLowerCase();
if (trimmed.startsWith("user:")) {
const id = trimmed.slice(5).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("channel:")) {
const id = trimmed.slice(8).trim();
return id ? `channel:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("discord:")) {
const id = trimmed.slice(8).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("@")) {
const id = trimmed.slice(1).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
return `channel:${trimmed}`.toLowerCase();
}
function normalizeTelegramTarget(raw: string): string | undefined {
const trimmed = raw.trim();
if (!trimmed) return undefined;
let normalized = trimmed;
if (normalized.startsWith("telegram:")) {
normalized = normalized.slice("telegram:".length).trim();
} else if (normalized.startsWith("tg:")) {
normalized = normalized.slice("tg:".length).trim();
} else if (normalized.startsWith("group:")) {
normalized = normalized.slice("group:".length).trim();
}
if (!normalized) return undefined;
const tmeMatch =
/^https?:\/\/t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized) ??
/^t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized);
if (tmeMatch?.[1]) normalized = `@${tmeMatch[1]}`;
if (!normalized) return undefined;
return `telegram:${normalized}`.toLowerCase();
}
export function normalizeTargetForProvider(
provider: string,
raw?: string,
): string | undefined {
if (!raw) return undefined;
switch (provider.trim().toLowerCase()) {
case "slack":
return normalizeSlackTarget(raw);
case "discord":
return normalizeDiscordTarget(raw);
case "telegram":
return normalizeTelegramTarget(raw);
default:
return raw.trim().toLowerCase() || undefined;
}
}

View File

@@ -62,6 +62,10 @@ import {
resolveModelAuthMode,
} from "./model-auth.js";
import { ensureClawdbotModelsJson } from "./models-config.js";
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
export type { MessagingToolSend } from "./pi-embedded-messaging.js";
import {
buildBootstrapContextFiles,
classifyFailoverReason,
@@ -266,13 +270,6 @@ type ApiKeyInfo = {
source: string;
};
export type MessagingToolSend = {
tool: string;
provider: string;
accountId?: string;
to?: string;
};
export type EmbeddedPiRunResult = {
payloads?: Array<{
text?: string;

View File

@@ -12,7 +12,16 @@ import { createSubsystemLogger } from "../logging.js";
import { truncateUtf16Safe } from "../utils.js";
import type { BlockReplyChunking } from "./pi-embedded-block-chunker.js";
import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
import { isMessagingToolDuplicate } from "./pi-embedded-helpers.js";
import {
isMessagingToolDuplicateNormalized,
normalizeTextForComparison,
} from "./pi-embedded-helpers.js";
import {
isMessagingTool,
isMessagingToolSendAction,
type MessagingToolSend,
normalizeTargetForProvider,
} from "./pi-embedded-messaging.js";
import {
extractAssistantText,
extractAssistantThinking,
@@ -57,13 +66,6 @@ const appendRawStream = (payload: Record<string, unknown>) => {
export type { BlockReplyChunking } from "./pi-embedded-block-chunker.js";
type MessagingToolSend = {
tool: string;
provider: string;
accountId?: string;
to?: string;
};
function truncateToolText(text: string): string {
if (text.length <= TOOL_RESULT_MAX_CHARS) return text;
return `${truncateUtf16Safe(text, TOOL_RESULT_MAX_CHARS)}\n…(truncated)…`;
@@ -124,78 +126,6 @@ function stripUnpairedThinkingTags(text: string): string {
return text;
}
function normalizeSlackTarget(raw: string): string | undefined {
const trimmed = raw.trim();
if (!trimmed) return undefined;
const mentionMatch = trimmed.match(/^<@([A-Z0-9]+)>$/i);
if (mentionMatch) return `user:${mentionMatch[1]}`;
if (trimmed.startsWith("user:")) {
const id = trimmed.slice(5).trim();
return id ? `user:${id}` : undefined;
}
if (trimmed.startsWith("channel:")) {
const id = trimmed.slice(8).trim();
return id ? `channel:${id}` : undefined;
}
if (trimmed.startsWith("slack:")) {
const id = trimmed.slice(6).trim();
return id ? `user:${id}` : undefined;
}
if (trimmed.startsWith("@")) {
const id = trimmed.slice(1).trim();
return id ? `user:${id}` : undefined;
}
if (trimmed.startsWith("#")) {
const id = trimmed.slice(1).trim();
return id ? `channel:${id}` : undefined;
}
return `channel:${trimmed}`;
}
function normalizeDiscordTarget(raw: string): string | undefined {
const trimmed = raw.trim();
if (!trimmed) return undefined;
const mentionMatch = trimmed.match(/^<@!?(\d+)>$/);
if (mentionMatch) return `user:${mentionMatch[1]}`;
if (trimmed.startsWith("user:")) {
const id = trimmed.slice(5).trim();
return id ? `user:${id}` : undefined;
}
if (trimmed.startsWith("channel:")) {
const id = trimmed.slice(8).trim();
return id ? `channel:${id}` : undefined;
}
if (trimmed.startsWith("discord:")) {
const id = trimmed.slice(8).trim();
return id ? `user:${id}` : undefined;
}
if (trimmed.startsWith("@")) {
const id = trimmed.slice(1).trim();
return id ? `user:${id}` : undefined;
}
return `channel:${trimmed}`;
}
function normalizeTelegramTarget(raw: string): string | undefined {
const trimmed = raw.trim();
if (!trimmed) return undefined;
let normalized = trimmed;
if (normalized.startsWith("telegram:")) {
normalized = normalized.slice("telegram:".length).trim();
} else if (normalized.startsWith("tg:")) {
normalized = normalized.slice("tg:".length).trim();
} else if (normalized.startsWith("group:")) {
normalized = normalized.slice("group:".length).trim();
}
if (!normalized) return undefined;
const tmeMatch =
/^https?:\/\/t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized) ??
/^t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized);
if (tmeMatch?.[1]) normalized = `@${tmeMatch[1]}`;
if (!normalized) return undefined;
return `telegram:${normalized}`;
}
function extractMessagingToolSend(
toolName: string,
args: Record<string, unknown>,
@@ -208,7 +138,7 @@ function extractMessagingToolSend(
if (action !== "sendMessage") return undefined;
const toRaw = typeof args.to === "string" ? args.to : undefined;
if (!toRaw) return undefined;
const to = normalizeSlackTarget(toRaw);
const to = normalizeTargetForProvider("slack", toRaw);
return to
? { tool: toolName, provider: "slack", accountId, to }
: undefined;
@@ -217,7 +147,7 @@ function extractMessagingToolSend(
if (action === "sendMessage") {
const toRaw = typeof args.to === "string" ? args.to : undefined;
if (!toRaw) return undefined;
const to = normalizeDiscordTarget(toRaw);
const to = normalizeTargetForProvider("discord", toRaw);
return to
? { tool: toolName, provider: "discord", accountId, to }
: undefined;
@@ -226,7 +156,7 @@ function extractMessagingToolSend(
const channelId =
typeof args.channelId === "string" ? args.channelId.trim() : "";
if (!channelId) return undefined;
const to = normalizeDiscordTarget(`channel:${channelId}`);
const to = normalizeTargetForProvider("discord", `channel:${channelId}`);
return to
? { tool: toolName, provider: "discord", accountId, to }
: undefined;
@@ -237,7 +167,7 @@ function extractMessagingToolSend(
if (action !== "sendMessage") return undefined;
const toRaw = typeof args.to === "string" ? args.to : undefined;
if (!toRaw) return undefined;
const to = normalizeTelegramTarget(toRaw);
const to = normalizeTargetForProvider("telegram", toRaw);
return to
? { tool: toolName, provider: "telegram", accountId, to }
: undefined;
@@ -248,8 +178,8 @@ function extractMessagingToolSend(
if (!toRaw) return undefined;
const providerRaw =
typeof args.provider === "string" ? args.provider.trim() : "";
const provider = providerRaw || "message";
const to = toRaw.trim();
const provider = providerRaw ? providerRaw.toLowerCase() : "message";
const to = normalizeTargetForProvider(provider, toRaw);
return to ? { tool: toolName, provider, accountId, to } : undefined;
}
return undefined;
@@ -316,18 +246,25 @@ export function subscribeEmbeddedPiSession(params: {
// Only committed (successful) texts are checked - pending texts are tracked
// to support commit logic but not used for suppression (avoiding lost messages on tool failure).
// These tools can send messages via sendMessage/threadReply actions (or sessions_send with message).
const MESSAGING_TOOLS = new Set([
"telegram",
"whatsapp",
"discord",
"slack",
"message",
"sessions_send",
]);
const MAX_MESSAGING_SENT_TEXTS = 200;
const MAX_MESSAGING_SENT_TARGETS = 200;
const messagingToolSentTexts: string[] = [];
const messagingToolSentTextsNormalized: string[] = [];
const messagingToolSentTargets: MessagingToolSend[] = [];
const pendingMessagingTexts = new Map<string, string>();
const pendingMessagingTargets = new Map<string, MessagingToolSend>();
const trimMessagingToolSent = () => {
if (messagingToolSentTexts.length > MAX_MESSAGING_SENT_TEXTS) {
const overflow = messagingToolSentTexts.length - MAX_MESSAGING_SENT_TEXTS;
messagingToolSentTexts.splice(0, overflow);
messagingToolSentTextsNormalized.splice(0, overflow);
}
if (messagingToolSentTargets.length > MAX_MESSAGING_SENT_TARGETS) {
const overflow =
messagingToolSentTargets.length - MAX_MESSAGING_SENT_TARGETS;
messagingToolSentTargets.splice(0, overflow);
}
};
const ensureCompactionPromise = () => {
if (!compactionRetryPromise) {
@@ -440,7 +377,13 @@ export function subscribeEmbeddedPiSession(params: {
// Only check committed (successful) messaging tool texts - checking pending texts
// is risky because if the tool fails after suppression, the user gets no response
if (isMessagingToolDuplicate(chunk, messagingToolSentTexts)) {
const normalizedChunk = normalizeTextForComparison(chunk);
if (
isMessagingToolDuplicateNormalized(
normalizedChunk,
messagingToolSentTextsNormalized,
)
) {
log.debug(
`Skipping block reply - already sent via messaging tool: ${chunk.slice(0, 50)}...`,
);
@@ -479,6 +422,7 @@ export function subscribeEmbeddedPiSession(params: {
toolMetaById.clear();
toolSummaryById.clear();
messagingToolSentTexts.length = 0;
messagingToolSentTextsNormalized.length = 0;
messagingToolSentTargets.length = 0;
pendingMessagingTexts.clear();
pendingMessagingTargets.clear();
@@ -556,7 +500,7 @@ export function subscribeEmbeddedPiSession(params: {
}
// Track messaging tool sends (pending until confirmed in tool_execution_end)
if (MESSAGING_TOOLS.has(toolName)) {
if (isMessagingTool(toolName)) {
const argsRecord =
args && typeof args === "object"
? (args as Record<string, unknown>)
@@ -565,14 +509,7 @@ export function subscribeEmbeddedPiSession(params: {
typeof argsRecord.action === "string"
? argsRecord.action.trim()
: "";
// Track send actions: sendMessage/threadReply for Discord/Slack, sessions_send (no action field),
// and message/send or message/thread-reply for the generic message tool.
const isMessagingSend =
action === "sendMessage" ||
action === "threadReply" ||
toolName === "sessions_send" ||
(toolName === "message" &&
(action === "send" || action === "thread-reply"));
const isMessagingSend = isMessagingToolSendAction(toolName, action);
if (isMessagingSend) {
const sendTarget = extractMessagingToolSend(toolName, argsRecord);
if (sendTarget) {
@@ -645,15 +582,20 @@ export function subscribeEmbeddedPiSession(params: {
pendingMessagingTexts.delete(toolCallId);
if (!isError) {
messagingToolSentTexts.push(pendingText);
messagingToolSentTextsNormalized.push(
normalizeTextForComparison(pendingText),
);
log.debug(
`Committed messaging text: tool=${toolName} len=${pendingText.length}`,
);
trimMessagingToolSent();
}
}
if (pendingTarget) {
pendingMessagingTargets.delete(toolCallId);
if (!isError) {
messagingToolSentTargets.push(pendingTarget);
trimMessagingToolSent();
}
}
@@ -892,7 +834,13 @@ export function subscribeEmbeddedPiSession(params: {
blockChunker.reset();
} else if (text !== lastBlockReplyText) {
// Check for duplicates before emitting (same logic as emitBlockChunk)
if (isMessagingToolDuplicate(text, messagingToolSentTexts)) {
const normalizedText = normalizeTextForComparison(text);
if (
isMessagingToolDuplicateNormalized(
normalizedText,
messagingToolSentTextsNormalized,
)
) {
log.debug(
`Skipping message_end block reply - already sent via messaging tool: ${text.slice(0, 50)}...`,
);

View File

@@ -1,4 +1,5 @@
import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js";
import { normalizeTargetForProvider } from "../../agents/pi-embedded-messaging.js";
import type { MessagingToolSend } from "../../agents/pi-embedded-runner.js";
import type { ReplyToMode } from "../../config/types.js";
import type { OriginatingChannelType } from "../templating.js";
@@ -75,95 +76,6 @@ export function filterMessagingToolDuplicates(params: {
);
}
function normalizeSlackTarget(raw: string): string | undefined {
const trimmed = raw.trim();
if (!trimmed) return undefined;
const mentionMatch = trimmed.match(/^<@([A-Z0-9]+)>$/i);
if (mentionMatch) return `user:${mentionMatch[1]}`.toLowerCase();
if (trimmed.startsWith("user:")) {
const id = trimmed.slice(5).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("channel:")) {
const id = trimmed.slice(8).trim();
return id ? `channel:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("slack:")) {
const id = trimmed.slice(6).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("@")) {
const id = trimmed.slice(1).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("#")) {
const id = trimmed.slice(1).trim();
return id ? `channel:${id}`.toLowerCase() : undefined;
}
return `channel:${trimmed}`.toLowerCase();
}
function normalizeDiscordTarget(raw: string): string | undefined {
const trimmed = raw.trim();
if (!trimmed) return undefined;
const mentionMatch = trimmed.match(/^<@!?(\d+)>$/);
if (mentionMatch) return `user:${mentionMatch[1]}`.toLowerCase();
if (trimmed.startsWith("user:")) {
const id = trimmed.slice(5).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("channel:")) {
const id = trimmed.slice(8).trim();
return id ? `channel:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("discord:")) {
const id = trimmed.slice(8).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
if (trimmed.startsWith("@")) {
const id = trimmed.slice(1).trim();
return id ? `user:${id}`.toLowerCase() : undefined;
}
return `channel:${trimmed}`.toLowerCase();
}
function normalizeTelegramTarget(raw: string): string | undefined {
const trimmed = raw.trim();
if (!trimmed) return undefined;
let normalized = trimmed;
if (normalized.startsWith("telegram:")) {
normalized = normalized.slice("telegram:".length).trim();
} else if (normalized.startsWith("tg:")) {
normalized = normalized.slice("tg:".length).trim();
} else if (normalized.startsWith("group:")) {
normalized = normalized.slice("group:".length).trim();
}
if (!normalized) return undefined;
const tmeMatch =
/^https?:\/\/t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized) ??
/^t\.me\/([A-Za-z0-9_]+)$/i.exec(normalized);
if (tmeMatch?.[1]) normalized = `@${tmeMatch[1]}`;
if (!normalized) return undefined;
return `telegram:${normalized}`.toLowerCase();
}
function normalizeTargetForProvider(
provider: string,
raw?: string,
): string | undefined {
if (!raw) return undefined;
switch (provider) {
case "slack":
return normalizeSlackTarget(raw);
case "discord":
return normalizeDiscordTarget(raw);
case "telegram":
return normalizeTelegramTarget(raw);
default:
return raw.trim().toLowerCase() || undefined;
}
}
function normalizeAccountId(value?: string): string | undefined {
const trimmed = value?.trim();
return trimmed ? trimmed.toLowerCase() : undefined;