fix: avoid duplicate replies with block streaming
This commit is contained in:
@@ -55,6 +55,7 @@
|
||||
### Fixes
|
||||
- Chat UI: keep the chat scrolled to the latest message after switching sessions.
|
||||
- Auto-reply: stream completed reply blocks as soon as they finish (configurable default + break); skip empty tool-only blocks unless verbose.
|
||||
- Discord: avoid duplicate sends when block streaming is enabled (race with typing hook).
|
||||
- Providers: make outbound text chunk limits configurable via `*.textChunkLimit` (defaults remain 4000/Discord 2000).
|
||||
- CLI onboarding: persist gateway token in config so local CLI auth works; recommend auth Off unless you need multi-machine access.
|
||||
- Control UI: accept a `?token=` URL param to auto-fill Gateway auth; onboarding now opens the dashboard with token auth when configured.
|
||||
|
||||
103
src/auto-reply/reply.block-streaming.test.ts
Normal file
103
src/auto-reply/reply.block-streaming.test.ts
Normal file
@@ -0,0 +1,103 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { loadModelCatalog } from "../agents/model-catalog.js";
|
||||
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
|
||||
import { getReplyFromConfig } from "./reply.js";
|
||||
|
||||
vi.mock("../agents/pi-embedded.js", () => ({
|
||||
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
|
||||
runEmbeddedPiAgent: vi.fn(),
|
||||
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
||||
resolveEmbeddedSessionLane: (key: string) =>
|
||||
`session:${key.trim() || "main"}`,
|
||||
}));
|
||||
vi.mock("../agents/model-catalog.js", () => ({
|
||||
loadModelCatalog: vi.fn(),
|
||||
}));
|
||||
|
||||
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
|
||||
const base = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-stream-"));
|
||||
const previousHome = process.env.HOME;
|
||||
process.env.HOME = base;
|
||||
try {
|
||||
return await fn(base);
|
||||
} finally {
|
||||
process.env.HOME = previousHome;
|
||||
await fs.rm(base, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
describe("block streaming", () => {
|
||||
beforeEach(() => {
|
||||
vi.mocked(runEmbeddedPiAgent).mockReset();
|
||||
vi.mocked(loadModelCatalog).mockResolvedValue([
|
||||
{ id: "claude-opus-4-5", name: "Opus 4.5", provider: "anthropic" },
|
||||
{ id: "gpt-4.1-mini", name: "GPT-4.1 Mini", provider: "openai" },
|
||||
]);
|
||||
});
|
||||
|
||||
async function waitForCalls(fn: () => number, calls: number) {
|
||||
const deadline = Date.now() + 1500;
|
||||
while (fn() < calls) {
|
||||
if (Date.now() > deadline) {
|
||||
throw new Error(`Expected ${calls} call(s), got ${fn()}`);
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, 5));
|
||||
}
|
||||
}
|
||||
|
||||
it("waits for block replies before returning final payloads", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
let releaseTyping: (() => void) | undefined;
|
||||
const typingGate = new Promise<void>((resolve) => {
|
||||
releaseTyping = resolve;
|
||||
});
|
||||
const onReplyStart = vi.fn(() => typingGate);
|
||||
const onBlockReply = vi.fn().mockResolvedValue(undefined);
|
||||
|
||||
vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => {
|
||||
void params.onBlockReply?.({ text: "hello" });
|
||||
return {
|
||||
payloads: [{ text: "hello" }],
|
||||
meta: {
|
||||
durationMs: 5,
|
||||
agentMeta: { sessionId: "s", provider: "p", model: "m" },
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const replyPromise = getReplyFromConfig(
|
||||
{
|
||||
Body: "ping",
|
||||
From: "+1004",
|
||||
To: "+2000",
|
||||
MessageSid: "msg-123",
|
||||
Surface: "discord",
|
||||
},
|
||||
{
|
||||
onReplyStart,
|
||||
onBlockReply,
|
||||
},
|
||||
{
|
||||
agent: {
|
||||
model: "anthropic/claude-opus-4-5",
|
||||
workspace: path.join(home, "clawd"),
|
||||
},
|
||||
whatsapp: { allowFrom: ["*"] },
|
||||
session: { store: path.join(home, "sessions.json") },
|
||||
},
|
||||
);
|
||||
|
||||
await waitForCalls(() => onReplyStart.mock.calls.length, 1);
|
||||
releaseTyping?.();
|
||||
|
||||
const res = await replyPromise;
|
||||
expect(res).toBeUndefined();
|
||||
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1421,16 +1421,19 @@ export async function getReplyFromConfig(
|
||||
const hasMedia = (payload.mediaUrls?.length ?? 0) > 0;
|
||||
if (!cleaned && !hasMedia) return;
|
||||
if (cleaned?.trim() === SILENT_REPLY_TOKEN && !hasMedia) return;
|
||||
await startTypingOnText(cleaned);
|
||||
const blockPayload: ReplyPayload = {
|
||||
text: cleaned,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
mediaUrl: payload.mediaUrls?.[0],
|
||||
replyToId: tagResult.replyToId,
|
||||
};
|
||||
const task = Promise.resolve(opts.onBlockReply?.(blockPayload))
|
||||
const payloadKey = buildPayloadKey(blockPayload);
|
||||
const task = (async () => {
|
||||
await startTypingOnText(cleaned);
|
||||
await opts.onBlockReply?.(blockPayload);
|
||||
})()
|
||||
.then(() => {
|
||||
streamedPayloadKeys.add(buildPayloadKey(blockPayload));
|
||||
streamedPayloadKeys.add(payloadKey);
|
||||
})
|
||||
.catch((err) => {
|
||||
logVerbose(`block reply delivery failed: ${String(err)}`);
|
||||
|
||||
Reference in New Issue
Block a user