refactor: centralize reply dispatch

This commit is contained in:
Peter Steinberger
2026-01-06 04:55:00 +01:00
parent 319dd14e8e
commit 13eb9c9ee9
10 changed files with 90 additions and 131 deletions

View File

@@ -0,0 +1,46 @@
import type { ClawdbotConfig } from "../../config/config.js";
import { getReplyFromConfig } from "../reply.js";
import type { MsgContext } from "../templating.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
type DispatchFromConfigResult = {
queuedFinal: boolean;
counts: Record<ReplyDispatchKind, number>;
};
export async function dispatchReplyFromConfig(params: {
ctx: MsgContext;
cfg: ClawdbotConfig;
dispatcher: ReplyDispatcher;
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
replyResolver?: typeof getReplyFromConfig;
}): Promise<DispatchFromConfigResult> {
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
params.ctx,
{
...params.replyOptions,
onToolResult: (payload: ReplyPayload) => {
params.dispatcher.sendToolResult(payload);
},
onBlockReply: (payload: ReplyPayload) => {
params.dispatcher.sendBlockReply(payload);
},
},
params.cfg,
);
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
let queuedFinal = false;
for (const reply of replies) {
queuedFinal = params.dispatcher.sendFinalReply(reply) || queuedFinal;
}
await params.dispatcher.waitForIdle();
return { queuedFinal, counts: params.dispatcher.getQueuedCounts() };
}

View File

@@ -22,7 +22,7 @@ export type ReplyDispatcherOptions = {
onError?: ReplyDispatchErrorHandler;
};
type ReplyDispatcher = {
export type ReplyDispatcher = {
sendToolResult: (payload: ReplyPayload) => boolean;
sendBlockReply: (payload: ReplyPayload) => boolean;
sendFinalReply: (payload: ReplyPayload) => boolean;

View File

@@ -18,13 +18,13 @@ import {
import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { hasControlCommand } from "../auto-reply/command-detection.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import {
buildMentionRegexes,
matchesMentionPatterns,
} from "../auto-reply/reply/mentions.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import type { TypingController } from "../auto-reply/reply/typing.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import type {
DiscordSlashCommandConfig,
@@ -589,32 +589,17 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
},
});
const replyResult = await getReplyFromConfig(
ctxPayload,
{
const { queuedFinal, counts } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
onReplyStart: () => sendTyping(message),
onTypingController: (typing) => {
typingController = typing;
},
onToolResult: (payload) => {
dispatcher.sendToolResult(payload);
},
onBlockReply: (payload) => {
dispatcher.sendBlockReply(payload);
},
},
cfg,
);
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
let queuedFinal = false;
for (const reply of replies) {
queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal;
}
await dispatcher.waitForIdle();
});
typingController?.markDispatchIdle();
if (!queuedFinal) {
if (
@@ -629,7 +614,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
}
didSendReply = true;
if (shouldLogVerbose()) {
const finalCount = dispatcher.getQueuedCounts().final;
const finalCount = counts.final;
logVerbose(
`discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);

View File

@@ -1,12 +1,12 @@
import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { hasControlCommand } from "../auto-reply/command-detection.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import {
buildMentionRegexes,
matchesMentionPatterns,
} from "../auto-reply/reply/mentions.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import { loadConfig } from "../config/config.js";
import {
@@ -285,28 +285,11 @@ export async function monitorIMessageProvider(
},
});
const replyResult = await getReplyFromConfig(
ctxPayload,
{
onToolResult: (payload) => {
dispatcher.sendToolResult(payload);
},
onBlockReply: (payload) => {
dispatcher.sendBlockReply(payload);
},
},
const { queuedFinal } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
);
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
let queuedFinal = false;
for (const reply of replies) {
queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal;
}
await dispatcher.waitForIdle();
dispatcher,
});
if (!queuedFinal) return;
};

View File

@@ -1,7 +1,7 @@
import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import { loadConfig } from "../config/config.js";
import { resolveStorePath, updateLastRoute } from "../config/sessions.js";
@@ -400,28 +400,11 @@ export async function monitorSignalProvider(
},
});
const replyResult = await getReplyFromConfig(
ctxPayload,
{
onToolResult: (payload) => {
dispatcher.sendToolResult(payload);
},
onBlockReply: (payload) => {
dispatcher.sendBlockReply(payload);
},
},
const { queuedFinal } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
);
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
let queuedFinal = false;
for (const reply of replies) {
queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal;
}
await dispatcher.waitForIdle();
dispatcher,
});
if (!queuedFinal) return;
};

View File

@@ -6,6 +6,7 @@ import bolt from "@slack/bolt";
import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { hasControlCommand } from "../auto-reply/command-detection.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import {
buildMentionRegexes,
matchesMentionPatterns,
@@ -757,31 +758,14 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
},
});
const replyResult = await getReplyFromConfig(
ctxPayload,
{
onToolResult: (payload) => {
dispatcher.sendToolResult(payload);
},
onBlockReply: (payload) => {
dispatcher.sendBlockReply(payload);
},
},
const { queuedFinal, counts } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
);
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
let queuedFinal = false;
for (const reply of replies) {
queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal;
}
await dispatcher.waitForIdle();
dispatcher,
});
if (!queuedFinal) return;
if (shouldLogVerbose()) {
const finalCount = dispatcher.getQueuedCounts().final;
const finalCount = counts.final;
logVerbose(
`slack: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);

View File

@@ -7,13 +7,13 @@ import { Bot, InputFile, webhookCallback } from "grammy";
import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { hasControlCommand } from "../auto-reply/command-detection.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import {
buildMentionRegexes,
matchesMentionPatterns,
} from "../auto-reply/reply/mentions.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import type { TypingController } from "../auto-reply/reply/typing.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import type { ReplyToMode } from "../config/config.js";
import { loadConfig } from "../config/config.js";
@@ -314,28 +314,17 @@ export function createTelegramBot(opts: TelegramBotOptions) {
},
});
const replyResult = await getReplyFromConfig(
ctxPayload,
{
const { queuedFinal } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
onReplyStart: sendTyping,
onTypingController: (typing) => {
typingController = typing;
},
onToolResult: dispatcher.sendToolResult,
onBlockReply: dispatcher.sendBlockReply,
},
cfg,
);
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
let queuedFinal = false;
for (const reply of replies) {
queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal;
}
await dispatcher.waitForIdle();
});
typingController?.markDispatchIdle();
if (!queuedFinal) return;
} catch (err) {

View File

@@ -9,6 +9,7 @@ import {
HEARTBEAT_PROMPT,
stripHeartbeatToken,
} from "../auto-reply/heartbeat.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import {
buildMentionRegexes,
normalizeMentionText,
@@ -1193,8 +1194,8 @@ export async function monitorWebProvider(
},
});
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
const { queuedFinal } = await dispatchReplyFromConfig({
ctx: {
Body: combinedBody,
From: msg.from,
To: msg.to,
@@ -1217,31 +1218,16 @@ export async function monitorWebProvider(
WasMentioned: msg.wasMentioned,
Surface: "whatsapp",
},
{
cfg,
dispatcher,
replyResolver,
replyOptions: {
onReplyStart: msg.sendComposing,
onTypingController: (typing) => {
typingController = typing;
},
onToolResult: (payload) => {
dispatcher.sendToolResult(payload);
},
onBlockReply: (payload) => {
dispatcher.sendBlockReply(payload);
},
},
);
const replyList = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
let queuedFinal = false;
for (const replyPayload of replyList) {
queuedFinal = dispatcher.sendFinalReply(replyPayload) || queuedFinal;
}
await dispatcher.waitForIdle();
});
typingController?.markDispatchIdle();
if (!queuedFinal) {
if (shouldClearGroupHistory && didSendReply) {