Previously, when block streaming was disabled (the default), text generated between tool calls would only appear after all tools completed. This was because onBlockReply wasn't passed to the subscription when block streaming was off, so flushBlockReplyBuffer() before tool execution did nothing. Now onBlockReply is always passed, and when block streaming is disabled, block replies are sent directly during tool flush. Directly sent payloads are tracked to avoid duplicates in final payloads. Also fixes a race condition where tool summaries could be emitted before the typing indicator started by awaiting onAgentEvent in tool handlers.
120 lines
4.7 KiB
TypeScript
120 lines
4.7 KiB
TypeScript
import type { ReplyToMode } from "../../config/types.js";
|
|
import { logVerbose } from "../../globals.js";
|
|
import { stripHeartbeatToken } from "../heartbeat.js";
|
|
import type { OriginatingChannelType } from "../templating.js";
|
|
import { SILENT_REPLY_TOKEN } from "../tokens.js";
|
|
import type { ReplyPayload } from "../types.js";
|
|
import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js";
|
|
import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
|
|
import { parseReplyDirectives } from "./reply-directives.js";
|
|
import {
|
|
applyReplyThreading,
|
|
filterMessagingToolDuplicates,
|
|
isRenderablePayload,
|
|
shouldSuppressMessagingToolReplies,
|
|
} from "./reply-payloads.js";
|
|
|
|
export function buildReplyPayloads(params: {
|
|
payloads: ReplyPayload[];
|
|
isHeartbeat: boolean;
|
|
didLogHeartbeatStrip: boolean;
|
|
blockStreamingEnabled: boolean;
|
|
blockReplyPipeline: BlockReplyPipeline | null;
|
|
/** Payload keys sent directly (not via pipeline) during tool flush. */
|
|
directlySentBlockKeys?: Set<string>;
|
|
replyToMode: ReplyToMode;
|
|
replyToChannel?: OriginatingChannelType;
|
|
currentMessageId?: string;
|
|
messageProvider?: string;
|
|
messagingToolSentTexts?: string[];
|
|
messagingToolSentTargets?: Parameters<
|
|
typeof shouldSuppressMessagingToolReplies
|
|
>[0]["messagingToolSentTargets"];
|
|
originatingTo?: string;
|
|
accountId?: string;
|
|
}): { replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean } {
|
|
let didLogHeartbeatStrip = params.didLogHeartbeatStrip;
|
|
const sanitizedPayloads = params.isHeartbeat
|
|
? params.payloads
|
|
: params.payloads.flatMap((payload) => {
|
|
let text = payload.text;
|
|
|
|
if (payload.isError && text && isBunFetchSocketError(text)) {
|
|
text = formatBunFetchSocketError(text);
|
|
}
|
|
|
|
if (!text || !text.includes("HEARTBEAT_OK")) {
|
|
return [{ ...payload, text }];
|
|
}
|
|
const stripped = stripHeartbeatToken(text, { mode: "message" });
|
|
if (stripped.didStrip && !didLogHeartbeatStrip) {
|
|
didLogHeartbeatStrip = true;
|
|
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
|
|
}
|
|
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
|
if (stripped.shouldSkip && !hasMedia) return [];
|
|
return [{ ...payload, text: stripped.text }];
|
|
});
|
|
|
|
const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({
|
|
payloads: sanitizedPayloads,
|
|
replyToMode: params.replyToMode,
|
|
replyToChannel: params.replyToChannel,
|
|
currentMessageId: params.currentMessageId,
|
|
})
|
|
.map((payload) => {
|
|
const parsed = parseReplyDirectives(payload.text ?? "", {
|
|
currentMessageId: params.currentMessageId,
|
|
silentToken: SILENT_REPLY_TOKEN,
|
|
});
|
|
const mediaUrls = payload.mediaUrls ?? parsed.mediaUrls;
|
|
const mediaUrl = payload.mediaUrl ?? parsed.mediaUrl ?? mediaUrls?.[0];
|
|
return {
|
|
...payload,
|
|
text: parsed.text ? parsed.text : undefined,
|
|
mediaUrls,
|
|
mediaUrl,
|
|
replyToId: payload.replyToId ?? parsed.replyToId,
|
|
replyToTag: payload.replyToTag || parsed.replyToTag,
|
|
replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent,
|
|
audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice),
|
|
};
|
|
})
|
|
.filter(isRenderablePayload);
|
|
|
|
// Drop final payloads only when block streaming succeeded end-to-end.
|
|
// If streaming aborted (e.g., timeout), fall back to final payloads.
|
|
const shouldDropFinalPayloads =
|
|
params.blockStreamingEnabled &&
|
|
Boolean(params.blockReplyPipeline?.didStream()) &&
|
|
!params.blockReplyPipeline?.isAborted();
|
|
const messagingToolSentTexts = params.messagingToolSentTexts ?? [];
|
|
const messagingToolSentTargets = params.messagingToolSentTargets ?? [];
|
|
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
|
|
messageProvider: params.messageProvider,
|
|
messagingToolSentTargets,
|
|
originatingTo: params.originatingTo,
|
|
accountId: params.accountId,
|
|
});
|
|
const dedupedPayloads = filterMessagingToolDuplicates({
|
|
payloads: replyTaggedPayloads,
|
|
sentTexts: messagingToolSentTexts,
|
|
});
|
|
// Filter out payloads already sent via pipeline or directly during tool flush.
|
|
const filteredPayloads = shouldDropFinalPayloads
|
|
? []
|
|
: params.blockStreamingEnabled
|
|
? dedupedPayloads.filter((payload) => !params.blockReplyPipeline?.hasSentPayload(payload))
|
|
: params.directlySentBlockKeys?.size
|
|
? dedupedPayloads.filter(
|
|
(payload) => !params.directlySentBlockKeys!.has(createBlockReplyPayloadKey(payload)),
|
|
)
|
|
: dedupedPayloads;
|
|
const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads;
|
|
|
|
return {
|
|
replyPayloads,
|
|
didLogHeartbeatStrip,
|
|
};
|
|
}
|