refactor: share reply payload threading/dedupe

This commit is contained in:
Peter Steinberger
2026-01-08 01:09:13 +00:00
parent 5e01e64cf3
commit a450390f7c
4 changed files with 197 additions and 63 deletions

View File

@@ -7,7 +7,6 @@ import {
queueEmbeddedPiMessage,
runEmbeddedPiAgent,
} from "../../agents/pi-embedded.js";
import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import {
loadSessionStore,
@@ -31,7 +30,12 @@ import {
type QueueSettings,
scheduleFollowupDrain,
} from "./queue.js";
import { extractReplyToTag } from "./reply-tags.js";
import {
applyReplyTagsToPayload,
applyReplyThreading,
filterMessagingToolDuplicates,
isRenderablePayload,
} from "./reply-payloads.js";
import {
createReplyToModeFilter,
resolveReplyToMode,
@@ -321,21 +325,25 @@ export async function runReplyAgent(params: {
if (stripped.shouldSkip && !hasMedia) return;
text = stripped.text;
}
const tagResult = extractReplyToTag(
text,
const taggedPayload = applyReplyTagsToPayload(
{
text,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
},
sessionCtx.MessageSid,
);
const cleaned = tagResult.cleaned || undefined;
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (!cleaned && !hasMedia) return;
if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia)
if (!isRenderablePayload(taggedPayload)) return;
const hasMedia =
Boolean(taggedPayload.mediaUrl) ||
(taggedPayload.mediaUrls?.length ?? 0) > 0;
if (
taggedPayload.text?.trim() === SILENT_REPLY_TOKEN &&
!hasMedia
)
return;
const blockPayload: ReplyPayload = applyReplyToMode({
text: cleaned,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
replyToId: tagResult.replyToId,
});
const blockPayload: ReplyPayload =
applyReplyToMode(taggedPayload);
const payloadKey = buildPayloadKey(blockPayload);
if (
streamedPayloadKeys.has(payloadKey) ||
@@ -345,7 +353,7 @@ export async function runReplyAgent(params: {
}
pendingStreamedPayloadKeys.add(payloadKey);
const task = (async () => {
await typingSignals.signalTextDelta(cleaned);
await typingSignals.signalTextDelta(taggedPayload.text);
await opts.onBlockReply?.(blockPayload);
})()
.then(() => {
@@ -507,41 +515,21 @@ export async function runReplyAgent(params: {
return [{ ...payload, text: stripped.text }];
});
const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads
.map((payload) => {
const { cleaned, replyToId } = extractReplyToTag(
payload.text,
sessionCtx.MessageSid,
);
return {
...payload,
text: cleaned ? cleaned : undefined,
replyToId: replyToId ?? payload.replyToId,
};
})
.filter(
(payload) =>
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
)
.map(applyReplyToMode);
const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({
payloads: sanitizedPayloads,
applyReplyToMode,
currentMessageId: sessionCtx.MessageSid,
});
// Drop final payloads if block streaming is enabled and we already streamed
// block replies. Tool-sent duplicates are filtered below.
const shouldDropFinalPayloads =
blockStreamingEnabled && didStreamBlockReply;
const messagingToolSentTexts = runResult.messagingToolSentTexts ?? [];
const dedupedPayloads =
messagingToolSentTexts.length > 0
? replyTaggedPayloads.filter(
(payload) =>
!isMessagingToolDuplicate(
payload.text ?? "",
messagingToolSentTexts,
),
)
: replyTaggedPayloads;
const dedupedPayloads = filterMessagingToolDuplicates({
payloads: replyTaggedPayloads,
sentTexts: messagingToolSentTexts,
});
const filteredPayloads = shouldDropFinalPayloads
? []
: blockStreamingEnabled

View File

@@ -0,0 +1,98 @@
import { describe, expect, it, vi } from "vitest";
import type { FollowupRun } from "./queue.js";
import { createMockTypingController } from "./test-helpers.js";
const runEmbeddedPiAgentMock = vi.fn();
vi.mock("../../agents/model-fallback.js", () => ({
runWithModelFallback: async ({
provider,
model,
run,
}: {
provider: string;
model: string;
run: (provider: string, model: string) => Promise<unknown>;
}) => ({
result: await run(provider, model),
provider,
model,
}),
}));
vi.mock("../../agents/pi-embedded.js", () => ({
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
}));
import { createFollowupRunner } from "./followup-runner.js";
const baseQueuedRun = (): FollowupRun =>
({
prompt: "hello",
summaryLine: "hello",
enqueuedAt: Date.now(),
run: {
sessionId: "session",
sessionKey: "main",
messageProvider: "whatsapp",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
config: {},
skillsSnapshot: {},
provider: "anthropic",
model: "claude",
thinkLevel: "low",
verboseLevel: "off",
elevatedLevel: "off",
bashElevated: {
enabled: false,
allowed: false,
defaultLevel: "off",
},
timeoutMs: 1_000,
blockReplyBreak: "message_end",
},
}) as FollowupRun;
describe("createFollowupRunner messaging tool dedupe", () => {
it("drops payloads already sent via messaging tool", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["hello world!"],
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun());
expect(onBlockReply).not.toHaveBeenCalled();
});
it("delivers payloads when not duplicates", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun());
expect(onBlockReply).toHaveBeenCalledTimes(1);
});
});

View File

@@ -14,7 +14,10 @@ import type { OriginatingChannelType } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { FollowupRun } from "./queue.js";
import { extractReplyToTag } from "./reply-tags.js";
import {
applyReplyThreading,
filterMessagingToolDuplicates,
} from "./reply-payloads.js";
import {
createReplyToModeFilter,
resolveReplyToMode,
@@ -193,24 +196,17 @@ export function createFollowupRunner(params: {
resolveReplyToMode(queued.run.config, replyToChannel),
);
const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads
.map((payload) => {
const { cleaned, replyToId } = extractReplyToTag(payload.text);
return {
...payload,
text: cleaned ? cleaned : undefined,
replyToId: replyToId ?? payload.replyToId,
};
})
.filter(
(payload) =>
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
)
.map(applyReplyToMode);
const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({
payloads: sanitizedPayloads,
applyReplyToMode,
});
if (replyTaggedPayloads.length === 0) return;
const dedupedPayloads = filterMessagingToolDuplicates({
payloads: replyTaggedPayloads,
sentTexts: runResult.messagingToolSentTexts ?? [],
});
if (dedupedPayloads.length === 0) return;
if (autoCompactionCompleted) {
const count = await incrementCompactionCount({
@@ -275,7 +271,7 @@ export function createFollowupRunner(params: {
}
}
await sendFollowupPayloads(replyTaggedPayloads, queued);
await sendFollowupPayloads(dedupedPayloads, queued);
} finally {
typing.markRunComplete();
}

View File

@@ -0,0 +1,52 @@
import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js";
import type { ReplyPayload } from "../types.js";
import { extractReplyToTag } from "./reply-tags.js";
export type ReplyToModeFilter = (payload: ReplyPayload) => ReplyPayload;
export function applyReplyTagsToPayload(
payload: ReplyPayload,
currentMessageId?: string,
): ReplyPayload {
if (typeof payload.text !== "string") return payload;
const { cleaned, replyToId } = extractReplyToTag(
payload.text,
currentMessageId,
);
return {
...payload,
text: cleaned ? cleaned : undefined,
replyToId: replyToId ?? payload.replyToId,
};
}
export function isRenderablePayload(payload: ReplyPayload): boolean {
return Boolean(
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
);
}
export function applyReplyThreading(params: {
payloads: ReplyPayload[];
applyReplyToMode: ReplyToModeFilter;
currentMessageId?: string;
}): ReplyPayload[] {
const { payloads, applyReplyToMode, currentMessageId } = params;
return payloads
.map((payload) => applyReplyTagsToPayload(payload, currentMessageId))
.filter(isRenderablePayload)
.map(applyReplyToMode);
}
export function filterMessagingToolDuplicates(params: {
payloads: ReplyPayload[];
sentTexts: string[];
}): ReplyPayload[] {
const { payloads, sentTexts } = params;
if (sentTexts.length === 0) return payloads;
return payloads.filter(
(payload) => !isMessagingToolDuplicate(payload.text ?? "", sentTexts),
);
}