diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index abbf28685..e297a7cd0 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -7,7 +7,6 @@ import { queueEmbeddedPiMessage, runEmbeddedPiAgent, } from "../../agents/pi-embedded.js"; -import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js"; import { hasNonzeroUsage } from "../../agents/usage.js"; import { loadSessionStore, @@ -31,7 +30,12 @@ import { type QueueSettings, scheduleFollowupDrain, } from "./queue.js"; -import { extractReplyToTag } from "./reply-tags.js"; +import { + applyReplyTagsToPayload, + applyReplyThreading, + filterMessagingToolDuplicates, + isRenderablePayload, +} from "./reply-payloads.js"; import { createReplyToModeFilter, resolveReplyToMode, @@ -321,21 +325,25 @@ export async function runReplyAgent(params: { if (stripped.shouldSkip && !hasMedia) return; text = stripped.text; } - const tagResult = extractReplyToTag( - text, + const taggedPayload = applyReplyTagsToPayload( + { + text, + mediaUrls: payload.mediaUrls, + mediaUrl: payload.mediaUrls?.[0], + }, sessionCtx.MessageSid, ); - const cleaned = tagResult.cleaned || undefined; - const hasMedia = (payload.mediaUrls?.length ?? 0) > 0; - if (!cleaned && !hasMedia) return; - if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia) + if (!isRenderablePayload(taggedPayload)) return; + const hasMedia = + Boolean(taggedPayload.mediaUrl) || + (taggedPayload.mediaUrls?.length ?? 0) > 0; + if ( + taggedPayload.text?.trim() === SILENT_REPLY_TOKEN && + !hasMedia + ) return; - const blockPayload: ReplyPayload = applyReplyToMode({ - text: cleaned, - mediaUrls: payload.mediaUrls, - mediaUrl: payload.mediaUrls?.[0], - replyToId: tagResult.replyToId, - }); + const blockPayload: ReplyPayload = + applyReplyToMode(taggedPayload); const payloadKey = buildPayloadKey(blockPayload); if ( streamedPayloadKeys.has(payloadKey) || @@ -345,7 +353,7 @@ export async function runReplyAgent(params: { } pendingStreamedPayloadKeys.add(payloadKey); const task = (async () => { - await typingSignals.signalTextDelta(cleaned); + await typingSignals.signalTextDelta(taggedPayload.text); await opts.onBlockReply?.(blockPayload); })() .then(() => { @@ -507,41 +515,21 @@ export async function runReplyAgent(params: { return [{ ...payload, text: stripped.text }]; }); - const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads - .map((payload) => { - const { cleaned, replyToId } = extractReplyToTag( - payload.text, - sessionCtx.MessageSid, - ); - return { - ...payload, - text: cleaned ? cleaned : undefined, - replyToId: replyToId ?? payload.replyToId, - }; - }) - .filter( - (payload) => - payload.text || - payload.mediaUrl || - (payload.mediaUrls && payload.mediaUrls.length > 0), - ) - .map(applyReplyToMode); + const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({ + payloads: sanitizedPayloads, + applyReplyToMode, + currentMessageId: sessionCtx.MessageSid, + }); // Drop final payloads if block streaming is enabled and we already streamed // block replies. Tool-sent duplicates are filtered below. const shouldDropFinalPayloads = blockStreamingEnabled && didStreamBlockReply; const messagingToolSentTexts = runResult.messagingToolSentTexts ?? []; - const dedupedPayloads = - messagingToolSentTexts.length > 0 - ? replyTaggedPayloads.filter( - (payload) => - !isMessagingToolDuplicate( - payload.text ?? "", - messagingToolSentTexts, - ), - ) - : replyTaggedPayloads; + const dedupedPayloads = filterMessagingToolDuplicates({ + payloads: replyTaggedPayloads, + sentTexts: messagingToolSentTexts, + }); const filteredPayloads = shouldDropFinalPayloads ? [] : blockStreamingEnabled diff --git a/src/auto-reply/reply/followup-runner.messaging-tools.test.ts b/src/auto-reply/reply/followup-runner.messaging-tools.test.ts new file mode 100644 index 000000000..5a04d4c2c --- /dev/null +++ b/src/auto-reply/reply/followup-runner.messaging-tools.test.ts @@ -0,0 +1,98 @@ +import { describe, expect, it, vi } from "vitest"; + +import type { FollowupRun } from "./queue.js"; +import { createMockTypingController } from "./test-helpers.js"; + +const runEmbeddedPiAgentMock = vi.fn(); + +vi.mock("../../agents/model-fallback.js", () => ({ + runWithModelFallback: async ({ + provider, + model, + run, + }: { + provider: string; + model: string; + run: (provider: string, model: string) => Promise; + }) => ({ + result: await run(provider, model), + provider, + model, + }), +})); + +vi.mock("../../agents/pi-embedded.js", () => ({ + runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), +})); + +import { createFollowupRunner } from "./followup-runner.js"; + +const baseQueuedRun = (): FollowupRun => + ({ + prompt: "hello", + summaryLine: "hello", + enqueuedAt: Date.now(), + run: { + sessionId: "session", + sessionKey: "main", + messageProvider: "whatsapp", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + config: {}, + skillsSnapshot: {}, + provider: "anthropic", + model: "claude", + thinkLevel: "low", + verboseLevel: "off", + elevatedLevel: "off", + bashElevated: { + enabled: false, + allowed: false, + defaultLevel: "off", + }, + timeoutMs: 1_000, + blockReplyBreak: "message_end", + }, + }) as FollowupRun; + +describe("createFollowupRunner messaging tool dedupe", () => { + it("drops payloads already sent via messaging tool", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["hello world!"], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner(baseQueuedRun()); + + expect(onBlockReply).not.toHaveBeenCalled(); + }); + + it("delivers payloads when not duplicates", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + meta: {}, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner(baseQueuedRun()); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index eebdf5e45..82cccdc9e 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -14,7 +14,10 @@ import type { OriginatingChannelType } from "../templating.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { FollowupRun } from "./queue.js"; -import { extractReplyToTag } from "./reply-tags.js"; +import { + applyReplyThreading, + filterMessagingToolDuplicates, +} from "./reply-payloads.js"; import { createReplyToModeFilter, resolveReplyToMode, @@ -193,24 +196,17 @@ export function createFollowupRunner(params: { resolveReplyToMode(queued.run.config, replyToChannel), ); - const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads - .map((payload) => { - const { cleaned, replyToId } = extractReplyToTag(payload.text); - return { - ...payload, - text: cleaned ? cleaned : undefined, - replyToId: replyToId ?? payload.replyToId, - }; - }) - .filter( - (payload) => - payload.text || - payload.mediaUrl || - (payload.mediaUrls && payload.mediaUrls.length > 0), - ) - .map(applyReplyToMode); + const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({ + payloads: sanitizedPayloads, + applyReplyToMode, + }); - if (replyTaggedPayloads.length === 0) return; + const dedupedPayloads = filterMessagingToolDuplicates({ + payloads: replyTaggedPayloads, + sentTexts: runResult.messagingToolSentTexts ?? [], + }); + + if (dedupedPayloads.length === 0) return; if (autoCompactionCompleted) { const count = await incrementCompactionCount({ @@ -275,7 +271,7 @@ export function createFollowupRunner(params: { } } - await sendFollowupPayloads(replyTaggedPayloads, queued); + await sendFollowupPayloads(dedupedPayloads, queued); } finally { typing.markRunComplete(); } diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts new file mode 100644 index 000000000..ac6bf6988 --- /dev/null +++ b/src/auto-reply/reply/reply-payloads.ts @@ -0,0 +1,52 @@ +import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js"; +import type { ReplyPayload } from "../types.js"; +import { extractReplyToTag } from "./reply-tags.js"; + +export type ReplyToModeFilter = (payload: ReplyPayload) => ReplyPayload; + +export function applyReplyTagsToPayload( + payload: ReplyPayload, + currentMessageId?: string, +): ReplyPayload { + if (typeof payload.text !== "string") return payload; + const { cleaned, replyToId } = extractReplyToTag( + payload.text, + currentMessageId, + ); + return { + ...payload, + text: cleaned ? cleaned : undefined, + replyToId: replyToId ?? payload.replyToId, + }; +} + +export function isRenderablePayload(payload: ReplyPayload): boolean { + return Boolean( + payload.text || + payload.mediaUrl || + (payload.mediaUrls && payload.mediaUrls.length > 0), + ); +} + +export function applyReplyThreading(params: { + payloads: ReplyPayload[]; + applyReplyToMode: ReplyToModeFilter; + currentMessageId?: string; +}): ReplyPayload[] { + const { payloads, applyReplyToMode, currentMessageId } = params; + return payloads + .map((payload) => applyReplyTagsToPayload(payload, currentMessageId)) + .filter(isRenderablePayload) + .map(applyReplyToMode); +} + +export function filterMessagingToolDuplicates(params: { + payloads: ReplyPayload[]; + sentTexts: string[]; +}): ReplyPayload[] { + const { payloads, sentTexts } = params; + if (sentTexts.length === 0) return payloads; + return payloads.filter( + (payload) => !isMessagingToolDuplicate(payload.text ?? "", sentTexts), + ); +}