fix: send text between tool calls to channel immediately

Previously, when block streaming was disabled (the default), text generated
between tool calls would only appear after all tools completed. This was
because onBlockReply wasn't passed to the subscription when block streaming
was off, so flushBlockReplyBuffer() before tool execution did nothing.

Now onBlockReply is always passed, and when block streaming is disabled,
block replies are sent directly during tool flush. Directly sent payloads
are tracked to avoid duplicates in final payloads.

Also fixes a race condition where tool summaries could be emitted before
the typing indicator started by awaiting onAgentEvent in tool handlers.
This commit is contained in:
Tyler Yust
2026-01-15 20:55:52 -08:00
parent e6364d031d
commit 2ee71e4154
11 changed files with 118 additions and 71 deletions

View File

@@ -26,7 +26,7 @@ import type { VerboseLevel } from "../thinking.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { buildThreadingToolContext, resolveEnforceFinalTag } from "./agent-runner-utils.js";
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
import type { FollowupRun } from "./queue.js";
import { parseReplyDirectives } from "./reply-directives.js";
import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js";
@@ -40,6 +40,8 @@ export type AgentRunLoopResult =
fallbackModel?: string;
didLogHeartbeatStrip: boolean;
autoCompactionCompleted: boolean;
/** Payload keys sent directly (not via pipeline) during tool flush. */
directlySentBlockKeys?: Set<string>;
}
| { kind: "final"; payload: ReplyPayload };
@@ -70,6 +72,8 @@ export async function runAgentTurnWithFallback(params: {
}): Promise<AgentRunLoopResult> {
let didLogHeartbeatStrip = false;
let autoCompactionCompleted = false;
// Track payloads sent directly (not via pipeline) during tool flush to avoid duplicates.
const directlySentBlockKeys = new Set<string>();
const runId = crypto.randomUUID();
if (params.sessionKey) {
@@ -244,12 +248,13 @@ export async function runAgentTurnWithFallback(params: {
});
}
: undefined,
onAgentEvent: (evt) => {
// Trigger typing when tools start executing
onAgentEvent: async (evt) => {
// Trigger typing when tools start executing.
// Must await to ensure typing indicator starts before tool summaries are emitted.
if (evt.stream === "tool") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "start" || phase === "update") {
void params.typingSignals.signalToolStart();
await params.typingSignals.signalToolStart();
}
}
// Track auto-compaction completion
@@ -261,57 +266,67 @@ export async function runAgentTurnWithFallback(params: {
}
}
},
onBlockReply:
params.blockStreamingEnabled && params.opts?.onBlockReply
? async (payload) => {
const { text, skip } = normalizeStreamingText(payload);
const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (skip && !hasPayloadMedia) return;
const taggedPayload = applyReplyTagsToPayload(
{
text,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
},
params.sessionCtx.MessageSid,
);
// Let through payloads with audioAsVoice flag even if empty (need to track it)
if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return;
const parsed = parseReplyDirectives(taggedPayload.text ?? "", {
currentMessageId: params.sessionCtx.MessageSid,
silentToken: SILENT_REPLY_TOKEN,
});
const cleaned = parsed.text || undefined;
const hasRenderableMedia =
Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0;
// Skip empty payloads unless they have audioAsVoice flag (need to track it)
if (
!cleaned &&
!hasRenderableMedia &&
!payload.audioAsVoice &&
!parsed.audioAsVoice
)
return;
if (parsed.isSilent && !hasRenderableMedia) return;
// Always pass onBlockReply so flushBlockReplyBuffer works before tool execution,
// even when regular block streaming is disabled. The handler sends directly
// via opts.onBlockReply when the pipeline isn't available.
onBlockReply: params.opts?.onBlockReply
? async (payload) => {
const { text, skip } = normalizeStreamingText(payload);
const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (skip && !hasPayloadMedia) return;
const taggedPayload = applyReplyTagsToPayload(
{
text,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
},
params.sessionCtx.MessageSid,
);
// Let through payloads with audioAsVoice flag even if empty (need to track it)
if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return;
const parsed = parseReplyDirectives(taggedPayload.text ?? "", {
currentMessageId: params.sessionCtx.MessageSid,
silentToken: SILENT_REPLY_TOKEN,
});
const cleaned = parsed.text || undefined;
const hasRenderableMedia =
Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0;
// Skip empty payloads unless they have audioAsVoice flag (need to track it)
if (
!cleaned &&
!hasRenderableMedia &&
!payload.audioAsVoice &&
!parsed.audioAsVoice
)
return;
if (parsed.isSilent && !hasRenderableMedia) return;
const blockPayload: ReplyPayload = params.applyReplyToMode({
...taggedPayload,
text: cleaned,
audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice),
replyToId: taggedPayload.replyToId ?? parsed.replyToId,
replyToTag: taggedPayload.replyToTag || parsed.replyToTag,
replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent,
const blockPayload: ReplyPayload = params.applyReplyToMode({
...taggedPayload,
text: cleaned,
audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice),
replyToId: taggedPayload.replyToId ?? parsed.replyToId,
replyToTag: taggedPayload.replyToTag || parsed.replyToTag,
replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent,
});
void params.typingSignals
.signalTextDelta(cleaned ?? taggedPayload.text)
.catch((err) => {
logVerbose(`block reply typing signal failed: ${String(err)}`);
});
void params.typingSignals
.signalTextDelta(cleaned ?? taggedPayload.text)
.catch((err) => {
logVerbose(`block reply typing signal failed: ${String(err)}`);
});
params.blockReplyPipeline?.enqueue(blockPayload);
// Use pipeline if available (block streaming enabled), otherwise send directly
if (params.blockStreamingEnabled && params.blockReplyPipeline) {
params.blockReplyPipeline.enqueue(blockPayload);
} else {
// Send directly when flushing before tool execution (no streaming).
// Track sent key to avoid duplicate in final payloads.
directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));
await params.opts?.onBlockReply?.(blockPayload);
}
: undefined,
}
: undefined,
onBlockReplyFlush:
params.blockStreamingEnabled && blockReplyPipeline
? async () => {
@@ -447,5 +462,6 @@ export async function runAgentTurnWithFallback(params: {
fallbackModel,
didLogHeartbeatStrip,
autoCompactionCompleted,
directlySentBlockKeys: directlySentBlockKeys.size > 0 ? directlySentBlockKeys : undefined,
};
}

View File

@@ -5,7 +5,7 @@ import type { OriginatingChannelType } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { ReplyPayload } from "../types.js";
import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js";
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
import { parseReplyDirectives } from "./reply-directives.js";
import {
applyReplyThreading,
@@ -20,6 +20,8 @@ export function buildReplyPayloads(params: {
didLogHeartbeatStrip: boolean;
blockStreamingEnabled: boolean;
blockReplyPipeline: BlockReplyPipeline | null;
/** Payload keys sent directly (not via pipeline) during tool flush. */
directlySentBlockKeys?: Set<string>;
replyToMode: ReplyToMode;
replyToChannel?: OriginatingChannelType;
currentMessageId?: string;
@@ -98,11 +100,16 @@ export function buildReplyPayloads(params: {
payloads: replyTaggedPayloads,
sentTexts: messagingToolSentTexts,
});
// Filter out payloads already sent via pipeline or directly during tool flush.
const filteredPayloads = shouldDropFinalPayloads
? []
: params.blockStreamingEnabled
? dedupedPayloads.filter((payload) => !params.blockReplyPipeline?.hasSentPayload(payload))
: dedupedPayloads;
: params.directlySentBlockKeys?.size
? dedupedPayloads.filter(
(payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)),
)
: dedupedPayloads;
const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads;
return {

View File

@@ -272,7 +272,7 @@ export async function runReplyAgent(params: {
return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn);
}
const { runResult, fallbackProvider, fallbackModel } = runOutcome;
const { runResult, fallbackProvider, fallbackModel, directlySentBlockKeys } = runOutcome;
let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome;
if (
@@ -314,6 +314,7 @@ export async function runReplyAgent(params: {
didLogHeartbeatStrip,
blockStreamingEnabled,
blockReplyPipeline,
directlySentBlockKeys,
replyToMode,
replyToChannel,
currentMessageId: sessionCtx.MessageSid,