Files
clawdbot/src/auto-reply/reply/reply-dispatcher.ts
2026-01-10 02:41:16 +01:00

125 lines
3.4 KiB
TypeScript

import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { normalizeReplyPayload } from "./normalize-reply.js";
import type { TypingController } from "./typing.js";
export type ReplyDispatchKind = "tool" | "block" | "final";
type ReplyDispatchErrorHandler = (
err: unknown,
info: { kind: ReplyDispatchKind },
) => void;
type ReplyDispatchDeliverer = (
payload: ReplyPayload,
info: { kind: ReplyDispatchKind },
) => Promise<void>;
export type ReplyDispatcherOptions = {
deliver: ReplyDispatchDeliverer;
responsePrefix?: string;
onHeartbeatStrip?: () => void;
onIdle?: () => void;
onError?: ReplyDispatchErrorHandler;
};
export type ReplyDispatcherWithTypingOptions = Omit<
ReplyDispatcherOptions,
"onIdle"
> & {
onReplyStart?: () => Promise<void> | void;
onIdle?: () => void;
};
type ReplyDispatcherWithTypingResult = {
dispatcher: ReplyDispatcher;
replyOptions: Pick<GetReplyOptions, "onReplyStart" | "onTypingController">;
markDispatchIdle: () => void;
};
export type ReplyDispatcher = {
sendToolResult: (payload: ReplyPayload) => boolean;
sendBlockReply: (payload: ReplyPayload) => boolean;
sendFinalReply: (payload: ReplyPayload) => boolean;
waitForIdle: () => Promise<void>;
getQueuedCounts: () => Record<ReplyDispatchKind, number>;
};
function normalizeReplyPayloadInternal(
payload: ReplyPayload,
opts: Pick<ReplyDispatcherOptions, "responsePrefix" | "onHeartbeatStrip">,
): ReplyPayload | null {
return normalizeReplyPayload(payload, {
responsePrefix: opts.responsePrefix,
onHeartbeatStrip: opts.onHeartbeatStrip,
});
}
export function createReplyDispatcher(
options: ReplyDispatcherOptions,
): ReplyDispatcher {
let sendChain: Promise<void> = Promise.resolve();
// Track in-flight deliveries so we can emit a reliable "idle" signal.
let pending = 0;
// Serialize outbound replies to preserve tool/block/final order.
const queuedCounts: Record<ReplyDispatchKind, number> = {
tool: 0,
block: 0,
final: 0,
};
const enqueue = (kind: ReplyDispatchKind, payload: ReplyPayload) => {
const normalized = normalizeReplyPayloadInternal(payload, options);
if (!normalized) return false;
queuedCounts[kind] += 1;
pending += 1;
sendChain = sendChain
.then(() => options.deliver(normalized, { kind }))
.catch((err) => {
options.onError?.(err, { kind });
})
.finally(() => {
pending -= 1;
if (pending === 0) {
options.onIdle?.();
}
});
return true;
};
return {
sendToolResult: (payload) => enqueue("tool", payload),
sendBlockReply: (payload) => enqueue("block", payload),
sendFinalReply: (payload) => enqueue("final", payload),
waitForIdle: () => sendChain,
getQueuedCounts: () => ({ ...queuedCounts }),
};
}
export function createReplyDispatcherWithTyping(
options: ReplyDispatcherWithTypingOptions,
): ReplyDispatcherWithTypingResult {
const { onReplyStart, onIdle, ...dispatcherOptions } = options;
let typingController: TypingController | undefined;
const dispatcher = createReplyDispatcher({
...dispatcherOptions,
onIdle: () => {
typingController?.markDispatchIdle();
onIdle?.();
},
});
return {
dispatcher,
replyOptions: {
onReplyStart,
onTypingController: (typing) => {
typingController = typing;
},
},
markDispatchIdle: () => {
typingController?.markDispatchIdle();
onIdle?.();
},
};
}