fix: harden block stream dedupe

This commit is contained in:
Peter Steinberger
2026-01-03 18:44:07 +01:00
parent 73fa2e10bc
commit 72b34f7d03
8 changed files with 162 additions and 38 deletions

View File

@@ -17,6 +17,7 @@
- Auto-reply: drop final payloads when block streaming to avoid duplicate Discord sends.
- Telegram: chunk block-stream replies to avoid “message is too long” errors (#124) — thanks @mukhtharcm.
- Block streaming: default to text_end and suppress duplicate block sends while in-flight.
- Block streaming: avoid duplicate block chunks when providers repeat full content on text_end.
- Block streaming: drop final payloads after soft chunking to keep Discord order intact.
- Gmail hooks: resolve gcloud Python to a real executable when PATH uses mise shims — thanks @joargp.
- Control UI: generate UUIDs when `crypto.randomUUID()` is unavailable over HTTP — thanks @ratulsarna.
@@ -34,6 +35,7 @@
- Build: require AVX2 Bun for x86_64 relay packaging (reject baseline builds).
- Auto-reply: add run-level telemetry + typing TTL guardrails to diagnose stuck replies.
- WhatsApp: honor per-group mention gating overrides when group ids are stored as session keys.
- Dependencies: bump pi-mono packages to 0.32.3.
### Docs
- Skills: add Sheets/Docs examples to gog skill (#128) — thanks @mbelinky.
@@ -42,6 +44,7 @@
- Skills: add tmux skill + interactive coding guidance in coding-agent.
- Gateway: document port configuration + multi-instance isolation.
- Onboarding/Config: add protocol notes for wizard + schema RPC.
- Queue: clarify steer-backlog behavior and update examples for streaming surfaces.
## 2.0.0-beta5 — 2026-01-03

View File

@@ -171,7 +171,7 @@ Controls how inbound messages behave when an agent run is already active.
bySurface: {
whatsapp: "collect",
telegram: "collect",
discord: "steer-backlog",
discord: "collect",
imessage: "collect",
webchat: "collect"
}

View File

@@ -27,6 +27,10 @@ Inbound messages can steer the current run, wait for a followup turn, or do both
- `interrupt` (legacy): abort the active run for that session, then run the newest message.
- `queue` (legacy alias): same as `steer`.
Steer-backlog means you can get a followup response after the steered run, so
streaming surfaces can look like duplicates. Prefer `collect`/`steer` if you want
one response per inbound message.
Defaults (when unset in config):
- All surfaces → `collect`
@@ -40,7 +44,7 @@ Configure globally or per surface via `routing.queue`:
debounceMs: 1000,
cap: 20,
drop: "summarize",
bySurface: { discord: "steer-backlog" }
bySurface: { discord: "collect" }
}
}
}

View File

@@ -73,10 +73,10 @@
"@clack/prompts": "^0.11.0",
"@grammyjs/transformer-throttler": "^1.2.1",
"@homebridge/ciao": "^1.3.4",
"@mariozechner/pi-agent-core": "^0.31.1",
"@mariozechner/pi-ai": "^0.31.1",
"@mariozechner/pi-coding-agent": "^0.31.1",
"@mariozechner/pi-tui": "^0.31.1",
"@mariozechner/pi-agent-core": "^0.32.3",
"@mariozechner/pi-ai": "^0.32.3",
"@mariozechner/pi-coding-agent": "^0.32.3",
"@mariozechner/pi-tui": "^0.32.3",
"@sinclair/typebox": "0.34.46",
"@whiskeysockets/baileys": "7.0.0-rc.9",
"ajv": "^8.17.1",
@@ -138,7 +138,7 @@
},
"patchedDependencies": {
"@mariozechner/pi-ai": "patches/@mariozechner__pi-ai.patch",
"@mariozechner/pi-coding-agent@0.31.1": "patches/@mariozechner__pi-coding-agent@0.31.1.patch"
"@mariozechner/pi-coding-agent@0.32.3": "patches/@mariozechner__pi-coding-agent@0.32.3.patch"
}
},
"vitest": {

55
pnpm-lock.yaml generated
View File

@@ -11,9 +11,9 @@ patchedDependencies:
'@mariozechner/pi-ai':
hash: 969db6f3f4cc91fec48124e1f5e515b386b1f1bed807769d0a80c28abadbaaae
path: patches/@mariozechner__pi-ai.patch
'@mariozechner/pi-coding-agent@0.31.1':
'@mariozechner/pi-coding-agent@0.32.3':
hash: d0d5ffa1bfda8a0f9d14a5e73a074014346d3edbdb2ffc91444d3be5119f5745
path: patches/@mariozechner__pi-coding-agent@0.31.1.patch
path: patches/@mariozechner__pi-coding-agent@0.32.3.patch
importers:
@@ -29,17 +29,17 @@ importers:
specifier: ^1.3.4
version: 1.3.4
'@mariozechner/pi-agent-core':
specifier: ^0.31.1
version: 0.31.1(ws@8.18.3)(zod@4.3.4)
specifier: ^0.32.3
version: 0.32.3(ws@8.18.3)(zod@4.3.4)
'@mariozechner/pi-ai':
specifier: ^0.31.1
version: 0.31.1(patch_hash=969db6f3f4cc91fec48124e1f5e515b386b1f1bed807769d0a80c28abadbaaae)(ws@8.18.3)(zod@4.3.4)
specifier: ^0.32.3
version: 0.32.3(patch_hash=969db6f3f4cc91fec48124e1f5e515b386b1f1bed807769d0a80c28abadbaaae)(ws@8.18.3)(zod@4.3.4)
'@mariozechner/pi-coding-agent':
specifier: ^0.31.1
version: 0.31.1(patch_hash=d0d5ffa1bfda8a0f9d14a5e73a074014346d3edbdb2ffc91444d3be5119f5745)(ws@8.18.3)(zod@4.3.4)
specifier: ^0.32.3
version: 0.32.3(patch_hash=d0d5ffa1bfda8a0f9d14a5e73a074014346d3edbdb2ffc91444d3be5119f5745)(ws@8.18.3)(zod@4.3.4)
'@mariozechner/pi-tui':
specifier: ^0.31.1
version: 0.31.1
specifier: ^0.32.3
version: 0.32.3
'@sinclair/typebox':
specifier: 0.34.46
version: 0.34.46
@@ -744,22 +744,22 @@ packages:
peerDependencies:
lit: ^3.3.1
'@mariozechner/pi-agent-core@0.31.1':
resolution: {integrity: sha512-skY2ZGrVbTkbTpdHql3mR0//BbLeqwKWQ0bnoI2H7YHbR5bTDSaYwtRtSdEgfPVgTk+WBQaZOZOv6be4qCAsww==}
'@mariozechner/pi-agent-core@0.32.3':
resolution: {integrity: sha512-NN/fd8eBISnRbQYblLjiaRbcFjz+SEFOGhZDZCXYgHJikTV8VyEon0nZFh7agBZJ7sXZaumMTjw/pGFrBFKcmA==}
engines: {node: '>=20.0.0'}
'@mariozechner/pi-ai@0.31.1':
resolution: {integrity: sha512-mqqitu/69ofLPmQEj7m04SvPQZEX+uacLHU9oQxz1c1khclsjw2S7G/v5P/3jK4hZjoZfHkPJRwPMFvEbo3wAA==}
'@mariozechner/pi-ai@0.32.3':
resolution: {integrity: sha512-njaa4/pN7U1TbjI1PKqII3M/W1CQ/obPAu5A4KHsX74e8sKjDEiEELUWzLTa5xOj+l1OXp/FtzI0hfJDq682rQ==}
engines: {node: '>=20.0.0'}
hasBin: true
'@mariozechner/pi-coding-agent@0.31.1':
resolution: {integrity: sha512-S+IQMYJssNFXQcdk8iB3tY0b1Idi0gAZCdwGCkTSG+vGghO0rdo+vQ+/v5KM6BhW2XsKiYz/2XCG8237iVwINA==}
'@mariozechner/pi-coding-agent@0.32.3':
resolution: {integrity: sha512-kOuzflbWc8GWHGOu/7KJQ1P3IodDk4AdOvibkJDcAn2aw5kG+LZAjLGt72tIMQRQna51i9XzL4nSvbxqV+mgWw==}
engines: {node: '>=20.0.0'}
hasBin: true
'@mariozechner/pi-tui@0.31.1':
resolution: {integrity: sha512-79hDQPAMPxKO0HKoXiFLp1HFbRDwweYL31l2wlsEzhPYgIYvx0Ii7X+KK4yMjjfDc4SjX6ZmSnJ4MUFAanuVzA==}
'@mariozechner/pi-tui@0.32.3':
resolution: {integrity: sha512-aq7e6qXcwSHvaTUKj5Ut9BY+ZezQ2rL+V4Sd2R5SnBW6JMCt5GnR/8uYBC2PcdxBQzGL9TnzjrC4av5jR+ilow==}
engines: {node: '>=20.0.0'}
'@mistralai/mistralai@1.10.0':
@@ -3351,10 +3351,10 @@ snapshots:
transitivePeerDependencies:
- tailwindcss
'@mariozechner/pi-agent-core@0.31.1(ws@8.18.3)(zod@4.3.4)':
'@mariozechner/pi-agent-core@0.32.3(ws@8.18.3)(zod@4.3.4)':
dependencies:
'@mariozechner/pi-ai': 0.31.1(patch_hash=969db6f3f4cc91fec48124e1f5e515b386b1f1bed807769d0a80c28abadbaaae)(ws@8.18.3)(zod@4.3.4)
'@mariozechner/pi-tui': 0.31.1
'@mariozechner/pi-ai': 0.32.3(patch_hash=969db6f3f4cc91fec48124e1f5e515b386b1f1bed807769d0a80c28abadbaaae)(ws@8.18.3)(zod@4.3.4)
'@mariozechner/pi-tui': 0.32.3
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
- bufferutil
@@ -3363,7 +3363,7 @@ snapshots:
- ws
- zod
'@mariozechner/pi-ai@0.31.1(patch_hash=969db6f3f4cc91fec48124e1f5e515b386b1f1bed807769d0a80c28abadbaaae)(ws@8.18.3)(zod@4.3.4)':
'@mariozechner/pi-ai@0.32.3(patch_hash=969db6f3f4cc91fec48124e1f5e515b386b1f1bed807769d0a80c28abadbaaae)(ws@8.18.3)(zod@4.3.4)':
dependencies:
'@anthropic-ai/sdk': 0.71.2(zod@4.3.4)
'@google/genai': 1.34.0
@@ -3383,11 +3383,11 @@ snapshots:
- ws
- zod
'@mariozechner/pi-coding-agent@0.31.1(patch_hash=d0d5ffa1bfda8a0f9d14a5e73a074014346d3edbdb2ffc91444d3be5119f5745)(ws@8.18.3)(zod@4.3.4)':
'@mariozechner/pi-coding-agent@0.32.3(patch_hash=d0d5ffa1bfda8a0f9d14a5e73a074014346d3edbdb2ffc91444d3be5119f5745)(ws@8.18.3)(zod@4.3.4)':
dependencies:
'@mariozechner/pi-agent-core': 0.31.1(ws@8.18.3)(zod@4.3.4)
'@mariozechner/pi-ai': 0.31.1(patch_hash=969db6f3f4cc91fec48124e1f5e515b386b1f1bed807769d0a80c28abadbaaae)(ws@8.18.3)(zod@4.3.4)
'@mariozechner/pi-tui': 0.31.1
'@mariozechner/pi-agent-core': 0.32.3(ws@8.18.3)(zod@4.3.4)
'@mariozechner/pi-ai': 0.32.3(patch_hash=969db6f3f4cc91fec48124e1f5e515b386b1f1bed807769d0a80c28abadbaaae)(ws@8.18.3)(zod@4.3.4)
'@mariozechner/pi-tui': 0.32.3
chalk: 5.6.2
cli-highlight: 2.1.11
diff: 8.0.2
@@ -3395,6 +3395,7 @@ snapshots:
glob: 11.1.0
jiti: 2.6.1
marked: 15.0.12
sharp: 0.34.5
transitivePeerDependencies:
- '@modelcontextprotocol/sdk'
- bufferutil
@@ -3403,7 +3404,7 @@ snapshots:
- ws
- zod
'@mariozechner/pi-tui@0.31.1':
'@mariozechner/pi-tui@0.32.3':
dependencies:
'@types/mime-types': 2.1.4
chalk: 5.6.2

View File

@@ -231,6 +231,99 @@ describe("subscribeEmbeddedPiSession", () => {
expect(subscription.assistantTexts).toEqual(["Hello block"]);
});
it("does not duplicate when text_end repeats full content", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Good morning!",
},
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
content: "Good morning!",
},
});
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(subscription.assistantTexts).toEqual(["Good morning!"]);
});
it("does not duplicate block chunks when text_end repeats full content", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
blockReplyChunking: {
minChars: 5,
maxChars: 40,
breakPreference: "newline",
},
});
const fullText = "First line\nSecond line\nThird line\n";
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: fullText,
},
});
const callsAfterDelta = onBlockReply.mock.calls.length;
expect(callsAfterDelta).toBeGreaterThan(0);
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
content: fullText,
},
});
expect(onBlockReply).toHaveBeenCalledTimes(callsAfterDelta);
});
it("streams soft chunks with paragraph preference", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -259,6 +259,11 @@ export function subscribeEmbeddedPiSession(params: {
if (!blockChunking) return;
const minChars = Math.max(1, Math.floor(blockChunking.minChars));
const maxChars = Math.max(minChars, Math.floor(blockChunking.maxChars));
if (force && blockBuffer.length > 0 && blockBuffer.length <= maxChars) {
emitBlockChunk(blockBuffer);
blockBuffer = "";
return;
}
if (blockBuffer.length < minChars && !force) return;
while (blockBuffer.length >= minChars || (force && blockBuffer.length > 0)) {
const breakIdx = pickBreakIndex(blockBuffer);
@@ -437,12 +442,30 @@ export function subscribeEmbeddedPiSession(params: {
evtType === "text_start" ||
evtType === "text_end"
) {
const chunk =
const delta =
typeof assistantRecord?.delta === "string"
? assistantRecord.delta
: typeof assistantRecord?.content === "string"
? assistantRecord.content
: "";
: "";
const content =
typeof assistantRecord?.content === "string"
? assistantRecord.content
: "";
let chunk = "";
if (evtType === "text_delta") {
chunk = delta;
} else if (evtType === "text_start" || evtType === "text_end") {
if (delta) {
chunk = delta;
} else if (content) {
if (content.startsWith(deltaBuffer)) {
chunk = content.slice(deltaBuffer.length);
} else if (deltaBuffer.startsWith(content)) {
chunk = "";
} else if (!deltaBuffer.includes(content)) {
chunk = content;
}
}
}
if (chunk) {
deltaBuffer += chunk;
blockBuffer += chunk;