fix: unify inbound dispatch pipeline

This commit is contained in:
Peter Steinberger
2026-01-23 22:51:37 +00:00
parent da26954dd0
commit 2e0a835e07
29 changed files with 543 additions and 297 deletions

View File

@@ -82,7 +82,8 @@ export async function runAgentTurnWithFallback(params: {
// Track payloads sent directly (not via pipeline) during tool flush to avoid duplicates.
const directlySentBlockKeys = new Set<string>();
const runId = crypto.randomUUID();
const runId = params.opts?.runId ?? crypto.randomUUID();
params.opts?.onAgentRunStart?.(runId);
if (params.sessionKey) {
registerAgentRunContext(runId, {
sessionKey: params.sessionKey,
@@ -174,6 +175,7 @@ export async function runAgentTurnWithFallback(params: {
extraSystemPrompt: params.followupRun.run.extraSystemPrompt,
ownerNumbers: params.followupRun.run.ownerNumbers,
cliSessionId,
images: params.opts?.images,
})
.then((result) => {
emitAgentEvent({
@@ -248,6 +250,8 @@ export async function runAgentTurnWithFallback(params: {
bashElevated: params.followupRun.run.bashElevated,
timeoutMs: params.followupRun.run.timeoutMs,
runId,
images: params.opts?.images,
abortSignal: params.opts?.abortSignal,
blockReplyBreak: params.resolvedBlockStreamingBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: allowPartialStream

View File

@@ -1,58 +1,44 @@
import type { ClawdbotConfig } from "../../config/config.js";
import type { FinalizedMsgContext } from "../templating.js";
import type { FinalizedMsgContext, MsgContext } from "../templating.js";
import type { GetReplyOptions } from "../types.js";
import type { DispatchFromConfigResult } from "./dispatch-from-config.js";
import { dispatchReplyFromConfig } from "./dispatch-from-config.js";
import type { DispatchInboundResult } from "../dispatch.js";
import {
createReplyDispatcher,
createReplyDispatcherWithTyping,
type ReplyDispatcherOptions,
type ReplyDispatcherWithTypingOptions,
dispatchInboundMessageWithBufferedDispatcher,
dispatchInboundMessageWithDispatcher,
} from "../dispatch.js";
import type {
ReplyDispatcherOptions,
ReplyDispatcherWithTypingOptions,
} from "./reply-dispatcher.js";
export async function dispatchReplyWithBufferedBlockDispatcher(params: {
ctx: FinalizedMsgContext;
ctx: MsgContext | FinalizedMsgContext;
cfg: ClawdbotConfig;
dispatcherOptions: ReplyDispatcherWithTypingOptions;
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
replyResolver?: typeof import("../reply.js").getReplyFromConfig;
}): Promise<DispatchFromConfigResult> {
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping(
params.dispatcherOptions,
);
const result = await dispatchReplyFromConfig({
}): Promise<DispatchInboundResult> {
return await dispatchInboundMessageWithBufferedDispatcher({
ctx: params.ctx,
cfg: params.cfg,
dispatcher,
dispatcherOptions: params.dispatcherOptions,
replyResolver: params.replyResolver,
replyOptions: {
...params.replyOptions,
...replyOptions,
},
replyOptions: params.replyOptions,
});
markDispatchIdle();
return result;
}
export async function dispatchReplyWithDispatcher(params: {
ctx: FinalizedMsgContext;
ctx: MsgContext | FinalizedMsgContext;
cfg: ClawdbotConfig;
dispatcherOptions: ReplyDispatcherOptions;
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
replyResolver?: typeof import("../reply.js").getReplyFromConfig;
}): Promise<DispatchFromConfigResult> {
const dispatcher = createReplyDispatcher(params.dispatcherOptions);
const result = await dispatchReplyFromConfig({
}): Promise<DispatchInboundResult> {
return await dispatchInboundMessageWithDispatcher({
ctx: params.ctx,
cfg: params.cfg,
dispatcher,
dispatcherOptions: params.dispatcherOptions,
replyResolver: params.replyResolver,
replyOptions: params.replyOptions,
});
await dispatcher.waitForIdle();
return result;
}

View File

@@ -1,3 +1,4 @@
import type { ImageContent } from "@mariozechner/pi-ai";
import type { TypingController } from "./reply/typing.js";
export type BlockReplyContext = {
@@ -13,6 +14,14 @@ export type ModelSelectedContext = {
};
export type GetReplyOptions = {
/** Override run id for agent events (defaults to random UUID). */
runId?: string;
/** Abort signal for the underlying agent run. */
abortSignal?: AbortSignal;
/** Optional inbound images (used for webchat attachments). */
images?: ImageContent[];
/** Notifies when an agent run actually starts (useful for webchat command handling). */
onAgentRunStart?: (runId: string) => void;
onReplyStart?: () => Promise<void> | void;
onTypingController?: (typing: TypingController) => void;
isHeartbeat?: boolean;