feat: stream reply blocks immediately

This commit is contained in:
Peter Steinberger
2026-01-03 00:28:33 +01:00
parent 9dd613edf7
commit 9616f4b2b1
14 changed files with 323 additions and 8 deletions

View File

@@ -327,6 +327,10 @@ export async function runEmbeddedPiAgent(params: {
text?: string;
mediaUrls?: string[];
}) => void | Promise<void>;
onBlockReply?: (payload: {
text?: string;
mediaUrls?: string[];
}) => void | Promise<void>;
onToolResult?: (payload: {
text?: string;
mediaUrls?: string[];
@@ -489,6 +493,7 @@ export async function runEmbeddedPiAgent(params: {
verboseLevel: params.verboseLevel,
shouldEmitToolResult: params.shouldEmitToolResult,
onToolResult: params.onToolResult,
onBlockReply: params.onBlockReply,
onPartialReply: params.onPartialReply,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,

View File

@@ -97,6 +97,37 @@ describe("subscribeEmbeddedPiSession", () => {
expect(payload.text).toBe("Hello world");
});
it("emits block replies on message_end", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run",
onBlockReply,
});
const assistantMessage = {
role: "assistant",
content: [{ type: "text", text: "Hello block" }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalled();
const payload = onBlockReply.mock.calls[0][0];
expect(payload.text).toBe("Hello block");
});
it("waits for auto-compaction retry and clears buffered text", async () => {
const listeners: SessionEventHandler[] = [];
const session = {

View File

@@ -57,6 +57,10 @@ export function subscribeEmbeddedPiSession(params: {
text?: string;
mediaUrls?: string[];
}) => void | Promise<void>;
onBlockReply?: (payload: {
text?: string;
mediaUrls?: string[];
}) => void | Promise<void>;
onPartialReply?: (payload: {
text?: string;
mediaUrls?: string[];
@@ -314,6 +318,15 @@ export function subscribeEmbeddedPiSession(params: {
? (extractFinalText(cleaned)?.trim() ?? cleaned)
: cleaned;
if (text) assistantTexts.push(text);
if (text && params.onBlockReply) {
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
void params.onBlockReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
}
}
deltaBuffer = "";
}
}

View File

@@ -599,6 +599,24 @@ export async function getReplyFromConfig(
inlineVerbose ??
(sessionEntry?.verboseLevel as VerboseLevel | undefined) ??
(agentCfg?.verboseDefault as VerboseLevel | undefined);
const resolvedBlockStreaming =
agentCfg?.blockStreamingDefault === "off" ? "off" : "on";
const blockStreamingEnabled = resolvedBlockStreaming === "on";
const streamedPayloadKeys = new Set<string>();
const pendingBlockTasks = new Set<Promise<void>>();
const buildPayloadKey = (payload: ReplyPayload) => {
const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
: payload.mediaUrl
? [payload.mediaUrl]
: [];
return JSON.stringify({
text,
mediaList,
replyToId: payload.replyToId ?? null,
});
};
const shouldEmitToolResult = () => {
if (!sessionKey || !storePath) {
return resolvedVerboseLevel === "on";
@@ -1371,6 +1389,48 @@ export async function getReplyFromConfig(
});
}
: undefined,
onBlockReply:
blockStreamingEnabled && opts?.onBlockReply
? async (payload) => {
let text = payload.text;
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
});
if (stripped.didStrip && !didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
}
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return;
text = stripped.text;
}
const tagResult = extractReplyToTag(
text,
sessionCtx.MessageSid,
);
const cleaned = tagResult.cleaned || undefined;
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (!cleaned && !hasMedia) return;
if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia) return;
await startTypingOnText(cleaned);
const blockPayload: ReplyPayload = {
text: cleaned,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
replyToId: tagResult.replyToId,
};
const task = Promise.resolve(opts.onBlockReply?.(blockPayload))
.then(() => {
streamedPayloadKeys.add(buildPayloadKey(blockPayload));
})
.catch((err) => {
logVerbose(`block reply delivery failed: ${String(err)}`);
});
pendingBlockTasks.add(task);
void task.finally(() => pendingBlockTasks.delete(task));
}
: undefined,
shouldEmitToolResult,
onToolResult: opts?.onToolResult
? async (payload) => {
@@ -1421,6 +1481,9 @@ export async function getReplyFromConfig(
const payloadArray = runResult.payloads ?? [];
if (payloadArray.length === 0) return undefined;
if (pendingBlockTasks.size > 0) {
await Promise.allSettled(pendingBlockTasks);
}
const sanitizedPayloads = opts?.isHeartbeat
? payloadArray
@@ -1457,9 +1520,15 @@ export async function getReplyFromConfig(
(payload.mediaUrls && payload.mediaUrls.length > 0),
);
if (replyTaggedPayloads.length === 0) return undefined;
const filteredPayloads = blockStreamingEnabled
? replyTaggedPayloads.filter(
(payload) => !streamedPayloadKeys.has(buildPayloadKey(payload)),
)
: replyTaggedPayloads;
const shouldSignalTyping = replyTaggedPayloads.some((payload) => {
if (filteredPayloads.length === 0) return undefined;
const shouldSignalTyping = filteredPayloads.some((payload) => {
const trimmed = payload.text?.trim();
if (trimmed && trimmed !== SILENT_REPLY_TOKEN) return true;
if (payload.mediaUrl) return true;
@@ -1514,7 +1583,7 @@ export async function getReplyFromConfig(
}
// If verbose is enabled and this is a new session, prepend a session hint.
let finalPayloads = replyTaggedPayloads;
let finalPayloads = filteredPayloads;
if (resolvedVerboseLevel === "on" && isNewSession) {
finalPayloads = [
{ text: `🧭 New session: ${sessionIdFinal}` },

View File

@@ -2,6 +2,7 @@ export type GetReplyOptions = {
onReplyStart?: () => Promise<void> | void;
isHeartbeat?: boolean;
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
onBlockReply?: (payload: ReplyPayload) => Promise<void> | void;
onToolResult?: (payload: ReplyPayload) => Promise<void> | void;
};

View File

@@ -524,6 +524,8 @@ export type ClawdisConfig = {
thinkingDefault?: "off" | "minimal" | "low" | "medium" | "high";
/** Default verbose level when no /verbose directive is present. */
verboseDefault?: "off" | "on";
/** Default block streaming level when no override is present. */
blockStreamingDefault?: "off" | "on";
timeoutSeconds?: number;
/** Max inbound media size in MB for agent-visible attachments (text note or future image attach). */
mediaMaxMb?: number;
@@ -901,6 +903,9 @@ const ClawdisSchema = z.object({
])
.optional(),
verboseDefault: z.union([z.literal("off"), z.literal("on")]).optional(),
blockStreamingDefault: z
.union([z.literal("off"), z.literal("on")])
.optional(),
timeoutSeconds: z.number().int().positive().optional(),
mediaMaxMb: z.number().positive().optional(),
typingIntervalSeconds: z.number().int().positive().optional(),

View File

@@ -415,10 +415,39 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
);
}
let didSendReply = false;
let blockSendChain: Promise<void> = Promise.resolve();
const sendBlockReply = (payload: ReplyPayload) => {
if (
!payload?.text &&
!payload?.mediaUrl &&
!(payload?.mediaUrls?.length ?? 0)
) {
return;
}
blockSendChain = blockSendChain
.then(async () => {
await deliverReplies({
replies: [payload],
target: ctxPayload.To,
token,
runtime,
replyToMode,
});
didSendReply = true;
})
.catch((err) => {
runtime.error?.(
danger(`discord block reply failed: ${String(err)}`),
);
});
};
const replyResult = await getReplyFromConfig(
ctxPayload,
{
onReplyStart: () => sendTyping(message),
onBlockReply: sendBlockReply,
},
cfg,
);
@@ -427,7 +456,18 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
? replyResult
: [replyResult]
: [];
if (replies.length === 0) return;
await blockSendChain;
if (replies.length === 0) {
if (
isGuildMessage &&
shouldClearHistory &&
historyLimit > 0 &&
didSendReply
) {
guildHistories.set(message.channelId, []);
}
return;
}
await deliverReplies({
replies,
@@ -436,12 +476,18 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
runtime,
replyToMode,
});
didSendReply = true;
if (isVerbose()) {
logVerbose(
`discord: delivered ${replies.length} reply${replies.length === 1 ? "" : "ies"} to ${ctxPayload.To}`,
);
}
if (isGuildMessage && shouldClearHistory && historyLimit > 0) {
if (
isGuildMessage &&
shouldClearHistory &&
historyLimit > 0 &&
didSendReply
) {
guildHistories.set(message.channelId, []);
}
} catch (err) {

View File

@@ -257,12 +257,43 @@ export async function monitorIMessageProvider(
);
}
const replyResult = await getReplyFromConfig(ctxPayload, undefined, cfg);
let blockSendChain: Promise<void> = Promise.resolve();
const sendBlockReply = (payload: ReplyPayload) => {
if (
!payload?.text &&
!payload?.mediaUrl &&
!(payload?.mediaUrls?.length ?? 0)
) {
return;
}
blockSendChain = blockSendChain
.then(async () => {
await deliverReplies({
replies: [payload],
target: ctxPayload.To,
client,
runtime,
maxBytes: mediaMaxBytes,
});
})
.catch((err) => {
runtime.error?.(
danger(`imessage block reply failed: ${String(err)}`),
);
});
};
const replyResult = await getReplyFromConfig(
ctxPayload,
{ onBlockReply: sendBlockReply },
cfg,
);
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
await blockSendChain;
if (replies.length === 0) return;
await deliverReplies({

View File

@@ -373,12 +373,44 @@ export async function monitorSignalProvider(
);
}
const replyResult = await getReplyFromConfig(ctxPayload, undefined, cfg);
let blockSendChain: Promise<void> = Promise.resolve();
const sendBlockReply = (payload: ReplyPayload) => {
if (
!payload?.text &&
!payload?.mediaUrl &&
!(payload?.mediaUrls?.length ?? 0)
) {
return;
}
blockSendChain = blockSendChain
.then(async () => {
await deliverReplies({
replies: [payload],
target: ctxPayload.To,
baseUrl,
account,
runtime,
maxBytes: mediaMaxBytes,
});
})
.catch((err) => {
runtime.error?.(
danger(`signal block reply failed: ${String(err)}`),
);
});
};
const replyResult = await getReplyFromConfig(
ctxPayload,
{ onBlockReply: sendBlockReply },
cfg,
);
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
await blockSendChain;
if (replies.length === 0) return;
await deliverReplies({

View File

@@ -198,9 +198,36 @@ export function createTelegramBot(opts: TelegramBotOptions) {
);
}
let blockSendChain: Promise<void> = Promise.resolve();
const sendBlockReply = (payload: ReplyPayload) => {
if (
!payload?.text &&
!payload?.mediaUrl &&
!(payload?.mediaUrls?.length ?? 0)
) {
return;
}
blockSendChain = blockSendChain
.then(async () => {
await deliverReplies({
replies: [payload],
chatId: String(chatId),
token: opts.token,
runtime,
bot,
replyToMode,
});
})
.catch((err) => {
runtime.error?.(
danger(`telegram block reply failed: ${String(err)}`),
);
});
};
const replyResult = await getReplyFromConfig(
ctxPayload,
{ onReplyStart: sendTyping },
{ onReplyStart: sendTyping, onBlockReply: sendBlockReply },
cfg,
);
const replies = replyResult
@@ -208,6 +235,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
? replyResult
: [replyResult]
: [];
await blockSendChain;
if (replies.length === 0) return;
await deliverReplies({

View File

@@ -1110,6 +1110,50 @@ export async function monitorWebProvider(
);
});
};
const sendBlockReply = (payload: ReplyPayload) => {
if (
!payload?.text &&
!payload?.mediaUrl &&
!(payload?.mediaUrls?.length ?? 0)
) {
return;
}
if (isSilentReply(payload)) return;
const blockPayload: ReplyPayload = { ...payload };
if (
responsePrefix &&
blockPayload.text &&
blockPayload.text.trim() !== HEARTBEAT_TOKEN &&
!blockPayload.text.startsWith(responsePrefix)
) {
blockPayload.text = `${responsePrefix} ${blockPayload.text}`;
}
toolSendChain = toolSendChain
.then(async () => {
await deliverWebReply({
replyResult: blockPayload,
msg,
maxMediaBytes,
replyLogger,
connectionId,
skipLog: true,
});
didSendReply = true;
if (blockPayload.text) {
recentlySent.add(blockPayload.text);
recentlySent.add(combinedBody);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
})
.catch((err) => {
whatsappOutboundLog.error(
`Failed sending web block update to ${msg.from ?? conversationId}: ${formatError(err)}`,
);
});
};
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
@@ -1138,6 +1182,7 @@ export async function monitorWebProvider(
{
onReplyStart: msg.sendComposing,
onToolResult: sendToolResult,
onBlockReply: sendBlockReply,
},
);