Merge pull request #614 from zknicker/fix/block-stream-think-leak

fix: prevent <think> leakage in block streaming
This commit is contained in:
Peter Steinberger
2026-01-09 23:10:17 +00:00
committed by GitHub
6 changed files with 159 additions and 52 deletions

View File

@@ -77,6 +77,7 @@
- Auto-reply: preserve spacing when stripping inline directives. (#539) — thanks @joshp123 - Auto-reply: preserve spacing when stripping inline directives. (#539) — thanks @joshp123
- Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj - Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj
- Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj - Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj
- Auto-reply: suppress `<think>` leakage in block streaming and emit `/reasoning` as a separate `Reasoning:` message. (#614) — thanks @zknicker
- Auto-reply: default block streaming off for non-Telegram providers unless explicitly enabled, and avoid splitting on forced flushes below max. - Auto-reply: default block streaming off for non-Telegram providers unless explicitly enabled, and avoid splitting on forced flushes below max.
- Auto-reply: raise default coalesce minChars for Signal/Slack/Discord and clarify streaming vs draft streaming in docs. - Auto-reply: raise default coalesce minChars for Signal/Slack/Discord and clarify streaming vs draft streaming in docs.
- Auto-reply: default block streaming coalesce idle to 1s to reduce tiny chunks. — thanks @steipete - Auto-reply: default block streaming coalesce idle to 1s to reduce tiny chunks. — thanks @steipete

View File

@@ -38,8 +38,8 @@ read_when:
## Reasoning visibility (/reasoning) ## Reasoning visibility (/reasoning)
- Levels: `on|off|stream`. - Levels: `on|off|stream`.
- Directive-only message toggles whether thinking blocks are shown as italic text in replies. - Directive-only message toggles whether thinking blocks are shown in replies.
- When enabled, any model-provided reasoning content is appended as a separate italic block. - When enabled, reasoning is sent as a **separate message** prefixed with `Reasoning:`.
- `stream` (Telegram only): streams reasoning into the Telegram draft bubble while the reply is generating, then sends the final answer without reasoning. - `stream` (Telegram only): streams reasoning into the Telegram draft bubble while the reply is generating, then sends the final answer without reasoning.
- Alias: `/reason`. - Alias: `/reason`.
- Send `/reasoning` (or `/reasoning:`) with no argument to see the current reasoning level. - Send `/reasoning` (or `/reasoning:`) with no argument to see the current reasoning level.

View File

@@ -1604,23 +1604,21 @@ export async function runEmbeddedPiAgent(params: {
} }
} }
const fallbackText = lastAssistant const reasoningText =
? (() => { lastAssistant && params.reasoningLevel === "on"
const base = extractAssistantText(lastAssistant); ? formatReasoningMarkdown(extractAssistantThinking(lastAssistant))
if (params.reasoningLevel !== "on") return base; : "";
const thinking = extractAssistantThinking(lastAssistant); if (reasoningText) replyItems.push({ text: reasoningText });
const formatted = thinking
? formatReasoningMarkdown(thinking) const fallbackAnswerText = lastAssistant
: ""; ? extractAssistantText(lastAssistant)
if (!formatted) return base;
return base ? `${formatted}\n\n${base}` : formatted;
})()
: ""; : "";
for (const text of assistantTexts.length const answerTexts = assistantTexts.length
? assistantTexts ? assistantTexts
: fallbackText : fallbackAnswerText
? [fallbackText] ? [fallbackAnswerText]
: []) { : [];
for (const text of answerTexts) {
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) if (!cleanedText && (!mediaUrls || mediaUrls.length === 0))
continue; continue;

View File

@@ -129,7 +129,7 @@ describe("subscribeEmbeddedPiSession", () => {
expect(payload.text).toBe("Hello block"); expect(payload.text).toBe("Hello block");
}); });
it("prepends reasoning before text when enabled", () => { it("emits reasoning as a separate message when enabled", () => {
let handler: ((evt: unknown) => void) | undefined; let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = { const session: StubSession = {
subscribe: (fn) => { subscribe: (fn) => {
@@ -160,11 +160,11 @@ describe("subscribeEmbeddedPiSession", () => {
handler?.({ type: "message_end", message: assistantMessage }); handler?.({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalledTimes(1); expect(onBlockReply).toHaveBeenCalledTimes(2);
const payload = onBlockReply.mock.calls[0][0]; expect(onBlockReply.mock.calls[0][0].text).toBe(
expect(payload.text).toBe( "Reasoning:\nBecause it helps",
"_Reasoning:_\n_Because it helps_\n\nFinal answer",
); );
expect(onBlockReply.mock.calls[1][0].text).toBe("Final answer");
}); });
it("promotes <think> tags to thinking blocks at write-time", () => { it("promotes <think> tags to thinking blocks at write-time", () => {
@@ -200,10 +200,11 @@ describe("subscribeEmbeddedPiSession", () => {
handler?.({ type: "message_end", message: assistantMessage }); handler?.({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalledTimes(1); expect(onBlockReply).toHaveBeenCalledTimes(2);
expect(onBlockReply.mock.calls[0][0].text).toBe( expect(onBlockReply.mock.calls[0][0].text).toBe(
"_Reasoning:_\n_Because it helps_\n\nFinal answer", "Reasoning:\nBecause it helps",
); );
expect(onBlockReply.mock.calls[1][0].text).toBe("Final answer");
expect(assistantMessage.content).toEqual([ expect(assistantMessage.content).toEqual([
{ type: "thinking", thinking: "Because it helps" }, { type: "thinking", thinking: "Because it helps" },
@@ -278,6 +279,74 @@ describe("subscribeEmbeddedPiSession", () => {
]); ]);
}); });
it.each([
{ tag: "think", open: "<think>", close: "</think>" },
{ tag: "thinking", open: "<thinking>", close: "</thinking>" },
])("suppresses <%s> blocks across chunk boundaries", ({ open, close }) => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
blockReplyChunking: {
minChars: 5,
maxChars: 50,
breakPreference: "newline",
},
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: `${open}Reasoning chunk that should not leak`,
},
});
expect(onBlockReply).not.toHaveBeenCalled();
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: `${close}\n\nFinal answer`,
},
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_end" },
});
const payloadTexts = onBlockReply.mock.calls
.map((call) => call[0]?.text)
.filter((value): value is string => typeof value === "string");
expect(payloadTexts.length).toBeGreaterThan(0);
for (const text of payloadTexts) {
expect(text).not.toContain("Reasoning");
expect(text).not.toContain(open);
}
const combined = payloadTexts.join(" ").replace(/\s+/g, " ").trim();
expect(combined).toBe("Final answer");
});
it("emits block replies on text_end and does not duplicate on message_end", () => { it("emits block replies on text_end and does not duplicate on message_end", () => {
let handler: ((evt: unknown) => void) | undefined; let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = { const session: StubSession = {
@@ -1058,10 +1127,17 @@ describe("subscribeEmbeddedPiSession", () => {
handler?.({ type: "message_end", message: assistantMessage }); handler?.({ type: "message_end", message: assistantMessage });
expect(onBlockReply).toHaveBeenCalledTimes(3); const payloadTexts = onBlockReply.mock.calls
expect(onBlockReply.mock.calls[1][0].text).toBe( .map((call) => call[0]?.text)
"````md\nline1\nline2\n````", .filter((value): value is string => typeof value === "string");
); expect(payloadTexts.length).toBeGreaterThan(0);
const combined = payloadTexts.join(" ").replace(/\s+/g, " ").trim();
expect(combined).toContain("````md");
expect(combined).toContain("line1");
expect(combined).toContain("line2");
expect(combined).toContain("````");
expect(combined).toContain("Intro");
expect(combined).toContain("Outro");
}); });
it("splits long single-line fenced blocks with reopen/close", () => { it("splits long single-line fenced blocks with reopen/close", () => {

View File

@@ -376,6 +376,8 @@ export function subscribeEmbeddedPiSession(params: {
typeof params.onReasoningStream === "function"; typeof params.onReasoningStream === "function";
let deltaBuffer = ""; let deltaBuffer = "";
let blockBuffer = ""; let blockBuffer = "";
// Track if a streamed chunk opened a <think> block (stateful across chunks).
let blockThinkingActive = false;
let lastStreamedAssistant: string | undefined; let lastStreamedAssistant: string | undefined;
let lastStreamedReasoning: string | undefined; let lastStreamedReasoning: string | undefined;
let lastBlockReplyText: string | undefined; let lastBlockReplyText: string | undefined;
@@ -481,9 +483,32 @@ export function subscribeEmbeddedPiSession(params: {
} }
}; };
const stripBlockThinkingSegments = (text: string): string => {
if (!text) return text;
if (!blockThinkingActive && !THINKING_TAG_SCAN_RE.test(text)) return text;
THINKING_TAG_SCAN_RE.lastIndex = 0;
let result = "";
let lastIndex = 0;
let inThinking = blockThinkingActive;
for (const match of text.matchAll(THINKING_TAG_SCAN_RE)) {
const idx = match.index ?? 0;
if (!inThinking) {
result += text.slice(lastIndex, idx);
}
const isClose = match[1] === "/";
inThinking = !isClose;
lastIndex = idx + match[0].length;
}
if (!inThinking) {
result += text.slice(lastIndex);
}
blockThinkingActive = inThinking;
return result;
};
const emitBlockChunk = (text: string) => { const emitBlockChunk = (text: string) => {
// Strip any <thinking> tags that may have leaked into the output (e.g., from Gemini mimicking history) // Strip <think> blocks across chunk boundaries to avoid leaking reasoning.
const strippedText = stripThinkingSegments(stripUnpairedThinkingTags(text)); const strippedText = stripBlockThinkingSegments(text);
const chunk = strippedText.trimEnd(); const chunk = strippedText.trimEnd();
if (!chunk) return; if (!chunk) return;
if (chunk === lastBlockReplyText) return; if (chunk === lastBlockReplyText) return;
@@ -571,6 +596,7 @@ export function subscribeEmbeddedPiSession(params: {
deltaBuffer = ""; deltaBuffer = "";
blockBuffer = ""; blockBuffer = "";
blockChunker?.reset(); blockChunker?.reset();
blockThinkingActive = false;
lastStreamedAssistant = undefined; lastStreamedAssistant = undefined;
lastStreamedReasoning = undefined; lastStreamedReasoning = undefined;
lastBlockReplyText = undefined; lastBlockReplyText = undefined;
@@ -590,6 +616,7 @@ export function subscribeEmbeddedPiSession(params: {
deltaBuffer = ""; deltaBuffer = "";
blockBuffer = ""; blockBuffer = "";
blockChunker?.reset(); blockChunker?.reset();
blockThinkingActive = false;
lastStreamedAssistant = undefined; lastStreamedAssistant = undefined;
lastBlockReplyText = undefined; lastBlockReplyText = undefined;
lastStreamedReasoning = undefined; lastStreamedReasoning = undefined;
@@ -909,11 +936,7 @@ export function subscribeEmbeddedPiSession(params: {
const formattedReasoning = rawThinking const formattedReasoning = rawThinking
? formatReasoningMarkdown(rawThinking) ? formatReasoningMarkdown(rawThinking)
: ""; : "";
const text = includeReasoning const text = baseText;
? baseText && formattedReasoning
? `${formattedReasoning}\n\n${baseText}`
: formattedReasoning || baseText
: baseText;
const addedDuringMessage = const addedDuringMessage =
assistantTexts.length > assistantTextBaseline; assistantTexts.length > assistantTextBaseline;
@@ -926,13 +949,28 @@ export function subscribeEmbeddedPiSession(params: {
} }
assistantTextBaseline = assistantTexts.length; assistantTextBaseline = assistantTexts.length;
const onBlockReply = params.onBlockReply;
const shouldEmitReasoning =
includeReasoning &&
Boolean(formattedReasoning) &&
Boolean(onBlockReply) &&
formattedReasoning !== lastReasoningSent;
const shouldEmitReasoningBeforeAnswer =
shouldEmitReasoning &&
blockReplyBreak === "message_end" &&
!addedDuringMessage;
if (shouldEmitReasoningBeforeAnswer && formattedReasoning) {
lastReasoningSent = formattedReasoning;
void onBlockReply?.({ text: formattedReasoning });
}
if ( if (
(blockReplyBreak === "message_end" || (blockReplyBreak === "message_end" ||
(blockChunker (blockChunker
? blockChunker.hasBuffered() ? blockChunker.hasBuffered()
: blockBuffer.length > 0)) && : blockBuffer.length > 0)) &&
text && text &&
params.onBlockReply onBlockReply
) { ) {
if (blockChunker?.hasBuffered()) { if (blockChunker?.hasBuffered()) {
blockChunker.drain({ force: true, emit: emitBlockChunk }); blockChunker.drain({ force: true, emit: emitBlockChunk });
@@ -948,7 +986,7 @@ export function subscribeEmbeddedPiSession(params: {
const { text: cleanedText, mediaUrls } = const { text: cleanedText, mediaUrls } =
splitMediaFromOutput(text); splitMediaFromOutput(text);
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
void params.onBlockReply({ void onBlockReply({
text: cleanedText, text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined, mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
}); });
@@ -956,16 +994,13 @@ export function subscribeEmbeddedPiSession(params: {
} }
} }
} }
const onBlockReply = params.onBlockReply; if (
const shouldEmitReasoningBlock = shouldEmitReasoning &&
includeReasoning && !shouldEmitReasoningBeforeAnswer &&
Boolean(formattedReasoning) && formattedReasoning
Boolean(onBlockReply) && ) {
formattedReasoning !== lastReasoningSent &&
(blockReplyBreak === "text_end" || Boolean(blockChunker));
if (shouldEmitReasoningBlock && formattedReasoning && onBlockReply) {
lastReasoningSent = formattedReasoning; lastReasoningSent = formattedReasoning;
void onBlockReply({ text: formattedReasoning }); void onBlockReply?.({ text: formattedReasoning });
} }
if (streamReasoning && rawThinking) { if (streamReasoning && rawThinking) {
emitReasoningStream(rawThinking); emitReasoningStream(rawThinking);
@@ -973,6 +1008,7 @@ export function subscribeEmbeddedPiSession(params: {
deltaBuffer = ""; deltaBuffer = "";
blockBuffer = ""; blockBuffer = "";
blockChunker?.reset(); blockChunker?.reset();
blockThinkingActive = false;
lastStreamedAssistant = undefined; lastStreamedAssistant = undefined;
} }
} }
@@ -1054,6 +1090,7 @@ export function subscribeEmbeddedPiSession(params: {
blockBuffer = ""; blockBuffer = "";
} }
} }
blockThinkingActive = false;
if (pendingCompactionRetry > 0) { if (pendingCompactionRetry > 0) {
resolveCompactionRetry(); resolveCompactionRetry();
} else { } else {

View File

@@ -37,12 +37,7 @@ export function extractAssistantThinking(msg: AssistantMessage): string {
export function formatReasoningMarkdown(text: string): string { export function formatReasoningMarkdown(text: string): string {
const trimmed = text.trim(); const trimmed = text.trim();
if (!trimmed) return ""; if (!trimmed) return "";
const lines = trimmed.split(/\r?\n/); return `Reasoning:\n${trimmed}`;
const wrapped = lines
.map((line) => line.trim())
.map((line) => (line ? `_${line}_` : ""))
.filter((line) => line.length > 0);
return wrapped.length > 0 ? [`_Reasoning:_`, ...wrapped].join("\n") : "";
} }
export function inferToolMetaFromArgs( export function inferToolMetaFromArgs(