fix: polish reply threading + tool dedupe (thanks @mneves75) (#326)

This commit is contained in:
Peter Steinberger
2026-01-08 00:50:29 +00:00
parent 33e2d53be3
commit 17d052bcda
16 changed files with 193 additions and 350 deletions

View File

@@ -7,6 +7,7 @@ import {
queueEmbeddedPiMessage,
runEmbeddedPiAgent,
} from "../../agents/pi-embedded.js";
import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import {
loadSessionStore,
@@ -19,7 +20,7 @@ import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { defaultRuntime } from "../../runtime.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import type { TemplateContext } from "../templating.js";
import type { OriginatingChannelType, TemplateContext } from "../templating.js";
import { normalizeVerboseLevel, type VerboseLevel } from "../thinking.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
@@ -31,6 +32,10 @@ import {
scheduleFollowupDrain,
} from "./queue.js";
import { extractReplyToTag } from "./reply-tags.js";
import {
createReplyToModeFilter,
resolveReplyToMode,
} from "./reply-threading.js";
import { incrementCompactionCount } from "./session-updates.js";
import type { TypingController } from "./typing.js";
import { createTypingSignaler } from "./typing-mode.js";
@@ -147,6 +152,16 @@ export async function runReplyAgent(params: {
replyToId: payload.replyToId ?? null,
});
};
const replyToChannel =
sessionCtx.OriginatingChannel ??
((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as
| OriginatingChannelType
| undefined);
const replyToMode = resolveReplyToMode(
followupRun.run.config,
replyToChannel,
);
const applyReplyToMode = createReplyToModeFilter(replyToMode);
if (shouldSteer && isStreaming) {
const steered = queueEmbeddedPiMessage(
@@ -315,13 +330,12 @@ export async function runReplyAgent(params: {
if (!cleaned && !hasMedia) return;
if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia)
return;
const blockPayload: ReplyPayload = {
const blockPayload: ReplyPayload = applyReplyToMode({
text: cleaned,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
// Default to incoming message ID for threading support (replyToMode: "first"|"all")
replyToId: tagResult.replyToId ?? sessionCtx.MessageSid,
};
replyToId: tagResult.replyToId,
});
const payloadKey = buildPayloadKey(blockPayload);
if (
streamedPayloadKeys.has(payloadKey) ||
@@ -502,8 +516,7 @@ export async function runReplyAgent(params: {
return {
...payload,
text: cleaned ? cleaned : undefined,
// Default to incoming message ID for threading support (replyToMode: "first"|"all")
replyToId: replyToId ?? payload.replyToId ?? sessionCtx.MessageSid,
replyToId: replyToId ?? payload.replyToId,
};
})
.filter(
@@ -511,23 +524,31 @@ export async function runReplyAgent(params: {
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
);
)
.map(applyReplyToMode);
// Drop final payloads if:
// 1. Block streaming is enabled and we already streamed block replies, OR
// 2. A messaging tool (telegram, whatsapp, etc.) successfully sent the response.
// The agent often generates confirmation text (e.g., "Respondi no Telegram!")
// AFTER using the messaging tool - we must suppress this confirmation text.
// Drop final payloads if block streaming is enabled and we already streamed
// block replies. Tool-sent duplicates are filtered below.
const shouldDropFinalPayloads =
(blockStreamingEnabled && didStreamBlockReply) ||
runResult.didSendViaMessagingTool === true;
blockStreamingEnabled && didStreamBlockReply;
const messagingToolSentTexts = runResult.messagingToolSentTexts ?? [];
const dedupedPayloads =
messagingToolSentTexts.length > 0
? replyTaggedPayloads.filter(
(payload) =>
!isMessagingToolDuplicate(
payload.text ?? "",
messagingToolSentTexts,
),
)
: replyTaggedPayloads;
const filteredPayloads = shouldDropFinalPayloads
? []
: blockStreamingEnabled
? replyTaggedPayloads.filter(
? dedupedPayloads.filter(
(payload) => !streamedPayloadKeys.has(buildPayloadKey(payload)),
)
: replyTaggedPayloads;
: dedupedPayloads;
if (filteredPayloads.length === 0) return finalizeWithFollowup(undefined);

View File

@@ -10,10 +10,15 @@ import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { defaultRuntime } from "../../runtime.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import type { OriginatingChannelType } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { FollowupRun } from "./queue.js";
import { extractReplyToTag } from "./reply-tags.js";
import {
createReplyToModeFilter,
resolveReplyToMode,
} from "./reply-threading.js";
import { isRoutableChannel, routeReply } from "./route-reply.js";
import { incrementCompactionCount } from "./session-updates.js";
import type { TypingController } from "./typing.js";
@@ -179,6 +184,14 @@ export function createFollowupRunner(params: {
if (stripped.shouldSkip && !hasMedia) return [];
return [{ ...payload, text: stripped.text }];
});
const replyToChannel =
queued.originatingChannel ??
(queued.run.messageProvider?.toLowerCase() as
| OriginatingChannelType
| undefined);
const applyReplyToMode = createReplyToModeFilter(
resolveReplyToMode(queued.run.config, replyToChannel),
);
const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads
.map((payload) => {
@@ -194,7 +207,8 @@ export function createFollowupRunner(params: {
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
);
)
.map(applyReplyToMode);
if (replyTaggedPayloads.length === 0) return;

View File

@@ -0,0 +1,53 @@
import { describe, expect, it } from "vitest";
import type { ClawdbotConfig } from "../../config/config.js";
import {
createReplyToModeFilter,
resolveReplyToMode,
} from "./reply-threading.js";
const emptyCfg = {} as ClawdbotConfig;
describe("resolveReplyToMode", () => {
it("defaults to first for Telegram", () => {
expect(resolveReplyToMode(emptyCfg, "telegram")).toBe("first");
});
it("defaults to off for Discord and Slack", () => {
expect(resolveReplyToMode(emptyCfg, "discord")).toBe("off");
expect(resolveReplyToMode(emptyCfg, "slack")).toBe("off");
});
it("defaults to all when channel is unknown", () => {
expect(resolveReplyToMode(emptyCfg, undefined)).toBe("all");
});
it("uses configured value when present", () => {
const cfg = {
telegram: { replyToMode: "all" },
discord: { replyToMode: "first" },
slack: { replyToMode: "all" },
} as ClawdbotConfig;
expect(resolveReplyToMode(cfg, "telegram")).toBe("all");
expect(resolveReplyToMode(cfg, "discord")).toBe("first");
expect(resolveReplyToMode(cfg, "slack")).toBe("all");
});
});
describe("createReplyToModeFilter", () => {
it("drops replyToId when mode is off", () => {
const filter = createReplyToModeFilter("off");
expect(filter({ text: "hi", replyToId: "1" }).replyToId).toBeUndefined();
});
it("keeps replyToId when mode is all", () => {
const filter = createReplyToModeFilter("all");
expect(filter({ text: "hi", replyToId: "1" }).replyToId).toBe("1");
});
it("keeps only the first replyToId when mode is first", () => {
const filter = createReplyToModeFilter("first");
expect(filter({ text: "hi", replyToId: "1" }).replyToId).toBe("1");
expect(filter({ text: "next", replyToId: "1" }).replyToId).toBeUndefined();
});
});

View File

@@ -0,0 +1,36 @@
import type { ClawdbotConfig } from "../../config/config.js";
import type { ReplyToMode } from "../../config/types.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
export function resolveReplyToMode(
cfg: ClawdbotConfig,
channel?: OriginatingChannelType,
): ReplyToMode {
switch (channel) {
case "telegram":
return cfg.telegram?.replyToMode ?? "first";
case "discord":
return cfg.discord?.replyToMode ?? "off";
case "slack":
return cfg.slack?.replyToMode ?? "off";
default:
return "all";
}
}
export function createReplyToModeFilter(mode: ReplyToMode) {
let hasThreaded = false;
return (payload: ReplyPayload): ReplyPayload => {
if (!payload.replyToId) return payload;
if (mode === "off") {
return { ...payload, replyToId: undefined };
}
if (mode === "all") return payload;
if (hasThreaded) {
return { ...payload, replyToId: undefined };
}
hasThreaded = true;
return payload;
};
}

View File

@@ -59,6 +59,21 @@ describe("routeReply", () => {
);
});
it("passes replyToId to Telegram sends", async () => {
mocks.sendMessageTelegram.mockClear();
await routeReply({
payload: { text: "hi", replyToId: "123" },
channel: "telegram",
to: "telegram:123",
cfg: {} as never,
});
expect(mocks.sendMessageTelegram).toHaveBeenCalledWith(
"telegram:123",
"hi",
expect.objectContaining({ replyToMessageId: 123 }),
);
});
it("uses replyToId as threadTs for Slack", async () => {
mocks.sendMessageSlack.mockClear();
await routeReply({

View File

@@ -75,9 +75,16 @@ export async function routeReply(
const { text, mediaUrl } = params;
switch (channel) {
case "telegram": {
const replyToMessageId = replyToId
? Number.parseInt(replyToId, 10)
: undefined;
const resolvedReplyToMessageId = Number.isFinite(replyToMessageId)
? replyToMessageId
: undefined;
const result = await sendMessageTelegram(to, text, {
mediaUrl,
messageThreadId: threadId,
replyToMessageId: resolvedReplyToMessageId,
});
return { ok: true, messageId: result.messageId };
}