Previously, when block streaming was disabled (the default), text generated between tool calls would only appear after all tools completed. This was because onBlockReply wasn't passed to the subscription when block streaming was off, so flushBlockReplyBuffer() before tool execution did nothing. Now onBlockReply is always passed, and when block streaming is disabled, block replies are sent directly during tool flush. Directly sent payloads are tracked to avoid duplicates in final payloads. Also fixes a race condition where tool summaries could be emitted before the typing indicator started by awaiting onAgentEvent in tool handlers.
87 lines
2.5 KiB
TypeScript
87 lines
2.5 KiB
TypeScript
import type { AgentEvent } from "@mariozechner/pi-agent-core";
|
|
|
|
import { emitAgentEvent } from "../infra/agent-events.js";
|
|
import { createInlineCodeState } from "../markdown/code-spans.js";
|
|
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
|
|
|
|
export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) {
|
|
ctx.log.debug(`embedded run agent start: runId=${ctx.params.runId}`);
|
|
emitAgentEvent({
|
|
runId: ctx.params.runId,
|
|
stream: "lifecycle",
|
|
data: {
|
|
phase: "start",
|
|
startedAt: Date.now(),
|
|
},
|
|
});
|
|
void ctx.params.onAgentEvent?.({
|
|
stream: "lifecycle",
|
|
data: { phase: "start" },
|
|
});
|
|
}
|
|
|
|
export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) {
|
|
ctx.state.compactionInFlight = true;
|
|
ctx.ensureCompactionPromise();
|
|
ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`);
|
|
void ctx.params.onAgentEvent?.({
|
|
stream: "compaction",
|
|
data: { phase: "start" },
|
|
});
|
|
}
|
|
|
|
export function handleAutoCompactionEnd(
|
|
ctx: EmbeddedPiSubscribeContext,
|
|
evt: AgentEvent & { willRetry?: unknown },
|
|
) {
|
|
ctx.state.compactionInFlight = false;
|
|
const willRetry = Boolean(evt.willRetry);
|
|
if (willRetry) {
|
|
ctx.noteCompactionRetry();
|
|
ctx.resetForCompactionRetry();
|
|
ctx.log.debug(`embedded run compaction retry: runId=${ctx.params.runId}`);
|
|
} else {
|
|
ctx.maybeResolveCompactionWait();
|
|
}
|
|
void ctx.params.onAgentEvent?.({
|
|
stream: "compaction",
|
|
data: { phase: "end", willRetry },
|
|
});
|
|
}
|
|
|
|
export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
|
|
ctx.log.debug(`embedded run agent end: runId=${ctx.params.runId}`);
|
|
emitAgentEvent({
|
|
runId: ctx.params.runId,
|
|
stream: "lifecycle",
|
|
data: {
|
|
phase: "end",
|
|
endedAt: Date.now(),
|
|
},
|
|
});
|
|
void ctx.params.onAgentEvent?.({
|
|
stream: "lifecycle",
|
|
data: { phase: "end" },
|
|
});
|
|
|
|
if (ctx.params.onBlockReply) {
|
|
if (ctx.blockChunker?.hasBuffered()) {
|
|
ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
|
|
ctx.blockChunker.reset();
|
|
} else if (ctx.state.blockBuffer.length > 0) {
|
|
ctx.emitBlockChunk(ctx.state.blockBuffer);
|
|
ctx.state.blockBuffer = "";
|
|
}
|
|
}
|
|
|
|
ctx.state.blockState.thinking = false;
|
|
ctx.state.blockState.final = false;
|
|
ctx.state.blockState.inlineCode = createInlineCodeState();
|
|
|
|
if (ctx.state.pendingCompactionRetry > 0) {
|
|
ctx.resolveCompactionRetry();
|
|
} else {
|
|
ctx.maybeResolveCompactionWait();
|
|
}
|
|
}
|