refactor: unify reply dispatch across providers

This commit is contained in:
Peter Steinberger
2026-01-05 19:43:54 +01:00
parent bfe7f5f126
commit c75b2a7067
17 changed files with 953 additions and 476 deletions

View File

@@ -8,6 +8,7 @@ import {
HEARTBEAT_PROMPT,
stripHeartbeatToken,
} from "../auto-reply/heartbeat.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import type { ReplyPayload } from "../auto-reply/types.js";
@@ -222,14 +223,6 @@ function debugMention(
export { stripHeartbeatToken };
function isSilentReply(payload?: ReplyPayload): boolean {
if (!payload) return false;
const text = payload.text?.trim();
if (!text || text !== SILENT_REPLY_TOKEN) return false;
if (payload.mediaUrl || payload.mediaUrls?.length) return false;
return true;
}
function resolveHeartbeatReplyPayload(
replyResult: ReplyPayload | ReplyPayload[] | undefined,
): ReplyPayload | undefined {
@@ -952,6 +945,25 @@ export async function monitorWebProvider(
// Track recently sent messages to prevent echo loops
const recentlySent = new Set<string>();
const MAX_RECENT_MESSAGES = 100;
const rememberSentText = (
text: string | undefined,
opts: { combinedBody: string; logVerboseMessage?: boolean },
) => {
if (!text) return;
recentlySent.add(text);
if (opts.combinedBody) {
recentlySent.add(opts.combinedBody);
}
if (opts.logVerboseMessage) {
logVerbose(
`Added to echo detection set (size now: ${recentlySent.size}): ${text.substring(0, 50)}...`,
);
}
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
};
while (true) {
if (stopRequested()) break;
@@ -1103,114 +1115,71 @@ export async function monitorWebProvider(
}
}
const responsePrefix = cfg.messages?.responsePrefix;
const textLimit = resolveTextChunkLimit(cfg, "whatsapp");
let didLogHeartbeatStrip = false;
let didSendReply = false;
let toolSendChain: Promise<void> = Promise.resolve();
const sendToolResult = (payload: ReplyPayload) => {
if (
!payload?.text &&
!payload?.mediaUrl &&
!(payload?.mediaUrls?.length ?? 0)
) {
return;
}
if (isSilentReply(payload)) return;
const toolPayload: ReplyPayload = { ...payload };
if (toolPayload.text?.includes(HEARTBEAT_TOKEN)) {
const stripped = stripHeartbeatToken(toolPayload.text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
const dispatcher = createReplyDispatcher({
responsePrefix: cfg.messages?.responsePrefix,
onHeartbeatStrip: () => {
if (!didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from web reply");
}
const hasMedia = Boolean(
toolPayload.mediaUrl || (toolPayload.mediaUrls?.length ?? 0) > 0,
},
deliver: async (payload, info) => {
await deliverWebReply({
replyResult: payload,
msg,
maxMediaBytes,
textLimit,
replyLogger,
connectionId,
// Tool + block updates are noisy; skip their log lines.
skipLog: info.kind !== "final",
});
didSendReply = true;
if (info.kind === "tool") {
rememberSentText(payload.text, { combinedBody: "" });
return;
}
const shouldLog =
info.kind === "final" && payload.text ? true : undefined;
rememberSentText(payload.text, {
combinedBody,
logVerboseMessage: shouldLog,
});
if (info.kind === "final") {
const fromDisplay =
msg.chatType === "group"
? conversationId
: (msg.from ?? "unknown");
const hasMedia = Boolean(
payload.mediaUrl || payload.mediaUrls?.length,
);
whatsappOutboundLog.info(
`Auto-replied to ${fromDisplay}${hasMedia ? " (media)" : ""}`,
);
if (shouldLogVerbose()) {
const preview =
payload.text != null ? elide(payload.text, 400) : "<media>";
whatsappOutboundLog.debug(
`Reply body: ${preview}${hasMedia ? " (media)" : ""}`,
);
}
}
},
onError: (err, info) => {
const label =
info.kind === "tool"
? "tool update"
: info.kind === "block"
? "block update"
: "auto-reply";
whatsappOutboundLog.error(
`Failed sending web ${label} to ${msg.from ?? conversationId}: ${formatError(err)}`,
);
if (stripped.shouldSkip && !hasMedia) return;
toolPayload.text = stripped.text;
}
if (
responsePrefix &&
toolPayload.text &&
toolPayload.text.trim() !== HEARTBEAT_TOKEN &&
!toolPayload.text.startsWith(responsePrefix)
) {
toolPayload.text = `${responsePrefix} ${toolPayload.text}`;
}
toolSendChain = toolSendChain
.then(async () => {
await deliverWebReply({
replyResult: toolPayload,
msg,
maxMediaBytes,
textLimit,
replyLogger,
connectionId,
skipLog: true,
});
didSendReply = true;
if (toolPayload.text) {
recentlySent.add(toolPayload.text);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
})
.catch((err) => {
whatsappOutboundLog.error(
`Failed sending web tool update to ${msg.from ?? conversationId}: ${formatError(err)}`,
);
});
};
const sendBlockReply = (payload: ReplyPayload) => {
if (
!payload?.text &&
!payload?.mediaUrl &&
!(payload?.mediaUrls?.length ?? 0)
) {
return;
}
if (isSilentReply(payload)) return;
const blockPayload: ReplyPayload = { ...payload };
if (
responsePrefix &&
blockPayload.text &&
blockPayload.text.trim() !== HEARTBEAT_TOKEN &&
!blockPayload.text.startsWith(responsePrefix)
) {
blockPayload.text = `${responsePrefix} ${blockPayload.text}`;
}
toolSendChain = toolSendChain
.then(async () => {
await deliverWebReply({
replyResult: blockPayload,
msg,
maxMediaBytes,
textLimit,
replyLogger,
connectionId,
skipLog: true,
});
didSendReply = true;
if (blockPayload.text) {
recentlySent.add(blockPayload.text);
recentlySent.add(combinedBody);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
})
.catch((err) => {
whatsappOutboundLog.error(
`Failed sending web block update to ${msg.from ?? conversationId}: ${formatError(err)}`,
);
});
};
},
});
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
@@ -1238,8 +1207,12 @@ export async function monitorWebProvider(
},
{
onReplyStart: msg.sendComposing,
onToolResult: sendToolResult,
onBlockReply: sendBlockReply,
onToolResult: (payload) => {
dispatcher.sendToolResult(payload);
},
onBlockReply: (payload) => {
dispatcher.sendBlockReply(payload);
},
},
);
@@ -1249,12 +1222,12 @@ export async function monitorWebProvider(
: [replyResult]
: [];
const sendableReplies = replyList.filter(
(payload) => !isSilentReply(payload),
);
if (sendableReplies.length === 0) {
await toolSendChain;
let queuedFinal = false;
for (const replyPayload of replyList) {
queuedFinal = dispatcher.sendFinalReply(replyPayload) || queuedFinal;
}
await dispatcher.waitForIdle();
if (!queuedFinal) {
if (shouldClearGroupHistory && didSendReply) {
groupHistories.set(conversationId, []);
}
@@ -1264,79 +1237,6 @@ export async function monitorWebProvider(
return;
}
await toolSendChain;
for (const replyPayload of sendableReplies) {
if (replyPayload.text?.includes(HEARTBEAT_TOKEN)) {
const stripped = stripHeartbeatToken(replyPayload.text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from web reply");
}
const hasMedia = Boolean(
replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0,
);
if (stripped.shouldSkip && !hasMedia) continue;
replyPayload.text = stripped.text;
}
if (
responsePrefix &&
replyPayload.text &&
replyPayload.text.trim() !== HEARTBEAT_TOKEN &&
!replyPayload.text.startsWith(responsePrefix)
) {
replyPayload.text = `${responsePrefix} ${replyPayload.text}`;
}
try {
await deliverWebReply({
replyResult: replyPayload,
msg,
maxMediaBytes,
textLimit,
replyLogger,
connectionId,
});
didSendReply = true;
if (replyPayload.text) {
recentlySent.add(replyPayload.text);
recentlySent.add(combinedBody); // Prevent echo on the combined text itself
logVerbose(
`Added to echo detection set (size now: ${recentlySent.size}): ${replyPayload.text.substring(0, 50)}...`,
);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
const fromDisplay =
msg.chatType === "group" ? conversationId : (msg.from ?? "unknown");
const hasMedia = Boolean(
replyPayload.mediaUrl || replyPayload.mediaUrls?.length,
);
whatsappOutboundLog.info(
`Auto-replied to ${fromDisplay}${hasMedia ? " (media)" : ""}`,
);
if (shouldLogVerbose()) {
const preview =
replyPayload.text != null
? elide(replyPayload.text, 400)
: "<media>";
whatsappOutboundLog.debug(
`Reply body: ${preview}${hasMedia ? " (media)" : ""}`,
);
}
} catch (err) {
whatsappOutboundLog.error(
`Failed sending web auto-reply to ${msg.from ?? conversationId}: ${formatError(err)}`,
);
}
}
if (shouldClearGroupHistory && didSendReply) {
groupHistories.set(conversationId, []);
}