fix: avoid duplicate block streaming
This commit is contained in:
@@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
- Telegram: chunk block-stream replies to avoid “message is too long” errors (#124) — thanks @mukhtharcm.
|
- Telegram: chunk block-stream replies to avoid “message is too long” errors (#124) — thanks @mukhtharcm.
|
||||||
|
- Block streaming: default to text_end and suppress duplicate block sends while in-flight.
|
||||||
- Gmail hooks: resolve gcloud Python to a real executable when PATH uses mise shims — thanks @joargp.
|
- Gmail hooks: resolve gcloud Python to a real executable when PATH uses mise shims — thanks @joargp.
|
||||||
- Control UI: generate UUIDs when `crypto.randomUUID()` is unavailable over HTTP — thanks @ratulsarna.
|
- Control UI: generate UUIDs when `crypto.randomUUID()` is unavailable over HTTP — thanks @ratulsarna.
|
||||||
- Agent: add soft block-stream chunking (800–1200 chars default) with paragraph/newline preference.
|
- Agent: add soft block-stream chunking (800–1200 chars default) with paragraph/newline preference.
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ current turn ends, then a new agent turn starts with the queued payloads. See
|
|||||||
|
|
||||||
Block streaming sends completed assistant blocks as soon as they finish; disable
|
Block streaming sends completed assistant blocks as soon as they finish; disable
|
||||||
via `agent.blockStreamingDefault: "off"` if you only want the final response.
|
via `agent.blockStreamingDefault: "off"` if you only want the final response.
|
||||||
Tune the boundary via `agent.blockStreamingBreak` (`text_end` vs `message_end`).
|
Tune the boundary via `agent.blockStreamingBreak` (`text_end` vs `message_end`; defaults to text_end).
|
||||||
Control soft block chunking with `agent.blockStreamingChunk` (defaults to
|
Control soft block chunking with `agent.blockStreamingChunk` (defaults to
|
||||||
800–1200 chars; prefers paragraph breaks, then newlines; sentences last).
|
800–1200 chars; prefers paragraph breaks, then newlines; sentences last).
|
||||||
|
|
||||||
|
|||||||
@@ -393,7 +393,7 @@ Controls the embedded agent runtime (model/thinking/verbose/timeouts).
|
|||||||
|
|
||||||
Block streaming:
|
Block streaming:
|
||||||
- `agent.blockStreamingDefault`: `"on"`/`"off"` (default on).
|
- `agent.blockStreamingDefault`: `"on"`/`"off"` (default on).
|
||||||
- `agent.blockStreamingBreak`: `"text_end"` or `"message_end"`.
|
- `agent.blockStreamingBreak`: `"text_end"` or `"message_end"` (default: text_end).
|
||||||
- `agent.blockStreamingChunk`: soft chunking for streamed blocks. Defaults to
|
- `agent.blockStreamingChunk`: soft chunking for streamed blocks. Defaults to
|
||||||
800–1200 chars, prefers paragraph breaks (`\n\n`), then newlines, then sentences.
|
800–1200 chars, prefers paragraph breaks (`\n\n`), then newlines, then sentences.
|
||||||
Example:
|
Example:
|
||||||
|
|||||||
@@ -1124,14 +1124,14 @@ export async function getReplyFromConfig(
|
|||||||
(agentCfg?.verboseDefault as VerboseLevel | undefined);
|
(agentCfg?.verboseDefault as VerboseLevel | undefined);
|
||||||
const resolvedBlockStreaming =
|
const resolvedBlockStreaming =
|
||||||
agentCfg?.blockStreamingDefault === "off" ? "off" : "on";
|
agentCfg?.blockStreamingDefault === "off" ? "off" : "on";
|
||||||
// TODO(steipete): Default to message_end for now; figure out why text_end breaks and whether we can revert.
|
|
||||||
const resolvedBlockStreamingBreak =
|
const resolvedBlockStreamingBreak =
|
||||||
agentCfg?.blockStreamingBreak === "text_end" ? "text_end" : "message_end";
|
agentCfg?.blockStreamingBreak === "message_end" ? "message_end" : "text_end";
|
||||||
const blockStreamingEnabled = resolvedBlockStreaming === "on";
|
const blockStreamingEnabled = resolvedBlockStreaming === "on";
|
||||||
const blockReplyChunking = blockStreamingEnabled
|
const blockReplyChunking = blockStreamingEnabled
|
||||||
? resolveBlockStreamingChunking(cfg, sessionCtx.Surface)
|
? resolveBlockStreamingChunking(cfg, sessionCtx.Surface)
|
||||||
: undefined;
|
: undefined;
|
||||||
const streamedPayloadKeys = new Set<string>();
|
const streamedPayloadKeys = new Set<string>();
|
||||||
|
const pendingStreamedPayloadKeys = new Set<string>();
|
||||||
const pendingBlockTasks = new Set<Promise<void>>();
|
const pendingBlockTasks = new Set<Promise<void>>();
|
||||||
const buildPayloadKey = (payload: ReplyPayload) => {
|
const buildPayloadKey = (payload: ReplyPayload) => {
|
||||||
const text = payload.text?.trim() ?? "";
|
const text = payload.text?.trim() ?? "";
|
||||||
@@ -2232,6 +2232,13 @@ export async function getReplyFromConfig(
|
|||||||
replyToId: tagResult.replyToId,
|
replyToId: tagResult.replyToId,
|
||||||
};
|
};
|
||||||
const payloadKey = buildPayloadKey(blockPayload);
|
const payloadKey = buildPayloadKey(blockPayload);
|
||||||
|
if (
|
||||||
|
streamedPayloadKeys.has(payloadKey) ||
|
||||||
|
pendingStreamedPayloadKeys.has(payloadKey)
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
pendingStreamedPayloadKeys.add(payloadKey);
|
||||||
const task = (async () => {
|
const task = (async () => {
|
||||||
await startTypingOnText(cleaned);
|
await startTypingOnText(cleaned);
|
||||||
await opts.onBlockReply?.(blockPayload);
|
await opts.onBlockReply?.(blockPayload);
|
||||||
@@ -2241,6 +2248,9 @@ export async function getReplyFromConfig(
|
|||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
logVerbose(`block reply delivery failed: ${String(err)}`);
|
logVerbose(`block reply delivery failed: ${String(err)}`);
|
||||||
|
})
|
||||||
|
.finally(() => {
|
||||||
|
pendingStreamedPayloadKeys.delete(payloadKey);
|
||||||
});
|
});
|
||||||
pendingBlockTasks.add(task);
|
pendingBlockTasks.add(task);
|
||||||
void task.finally(() => pendingBlockTasks.delete(task));
|
void task.finally(() => pendingBlockTasks.delete(task));
|
||||||
|
|||||||
Reference in New Issue
Block a user