docs(agent): annotate stream invariants

This commit is contained in:
Peter Steinberger
2026-01-05 18:10:03 +00:00
parent 86ad703f53
commit cc790f2c84
4 changed files with 13 additions and 0 deletions

View File

@@ -177,6 +177,9 @@ export function subscribeEmbeddedPiSession(params: {
const blockChunker = blockChunking
? new EmbeddedBlockChunker(blockChunking)
: null;
// KNOWN: Provider streams are not strictly once-only or perfectly ordered.
// `text_end` can repeat full content; late `text_end` can arrive after `message_end`.
// Tests: `src/agents/pi-embedded-subscribe.test.ts` (e.g. late text_end cases).
const shouldEmitToolResult = () =>
typeof params.shouldEmitToolResult === "function"
? params.shouldEmitToolResult()
@@ -231,6 +234,8 @@ export function subscribeEmbeddedPiSession(params: {
if (evt.type === "message_start") {
const msg = (evt as AgentEvent & { message: AgentMessage }).message;
if (msg?.role === "assistant") {
// KNOWN: Resetting at `text_end` is unsafe (late/duplicate end events).
// ASSUME: `message_start` is the only reliable boundary for “new assistant message begins”.
// Start-of-message is a safer reset point than message_end: some providers
// may deliver late text_end updates after message_end, which would
// otherwise re-trigger block replies.
@@ -387,6 +392,8 @@ export function subscribeEmbeddedPiSession(params: {
if (delta) {
chunk = delta;
} else if (content) {
// KNOWN: Some providers resend full content on `text_end`.
// We only append a suffix (or nothing) to keep output monotonic.
// Providers may resend full content on text_end; append only the suffix.
if (content.startsWith(deltaBuffer)) {
chunk = content.slice(deltaBuffer.length);