fix: stop partial replies for whatsapp/telegram surfaces

This commit is contained in:
Peter Steinberger
2025-12-09 01:40:51 +01:00
parent e44ed2681f
commit 5bfecc6152
6 changed files with 107 additions and 88 deletions

View File

@@ -247,7 +247,11 @@ describe("runCommandReply (pi)", () => {
expect(events).toContainEqual({ expect(events).toContainEqual({
stream: "tool", stream: "tool",
data: expect.objectContaining({ phase: "start", name: "bash", toolCallId: "call-1" }), data: expect.objectContaining({
phase: "start",
name: "bash",
toolCallId: "call-1",
}),
}); });
expect(events).toContainEqual({ expect(events).toContainEqual({
stream: "tool", stream: "tool",

View File

@@ -1,7 +1,11 @@
import fs from "node:fs/promises"; import fs from "node:fs/promises";
import os from "node:os"; import os from "node:os";
import path from "node:path"; import path from "node:path";
import type {
AgentEvent,
AssistantMessage,
Message,
} from "@mariozechner/pi-ai";
import { piSpec } from "../agents/pi.js"; import { piSpec } from "../agents/pi.js";
import type { AgentMeta, AgentToolResult } from "../agents/types.js"; import type { AgentMeta, AgentToolResult } from "../agents/types.js";
import type { WarelayConfig } from "../config/config.js"; import type { WarelayConfig } from "../config/config.js";
@@ -13,7 +17,6 @@ import { splitMediaFromOutput } from "../media/parse.js";
import { enqueueCommand } from "../process/command-queue.js"; import { enqueueCommand } from "../process/command-queue.js";
import type { runCommandWithTimeout } from "../process/exec.js"; import type { runCommandWithTimeout } from "../process/exec.js";
import { runPiRpc } from "../process/tau-rpc.js"; import { runPiRpc } from "../process/tau-rpc.js";
import type { AgentEvent, AssistantMessage, Message } from "@mariozechner/pi-ai";
import { applyTemplate, type TemplateContext } from "./templating.js"; import { applyTemplate, type TemplateContext } from "./templating.js";
import { import {
formatToolAggregate, formatToolAggregate,
@@ -614,63 +617,72 @@ export async function runCommandReply(
} }
if ( if (
("message" in ev && ev.message) && "message" in ev &&
ev.message &&
(ev.type === "message" || ev.type === "message_end") (ev.type === "message" || ev.type === "message_end")
) { ) {
const msg = ev.message as Message; const msg = ev.message as Message & {
const role = (msg as any).role; toolCallId?: string;
const isToolResult = role === "toolResult" || role === "tool_result"; tool_call_id?: string;
};
const role = msg.role;
const isToolResult =
role === "toolResult" || role === "tool_result";
if (!isToolResult || !Array.isArray(msg.content)) { if (!isToolResult || !Array.isArray(msg.content)) {
// not a tool result message we care about // not a tool result message we care about
} else { } else {
const toolName = inferToolName(msg); const toolName = inferToolName(msg);
const toolCallId = const toolCallId = msg.toolCallId ?? msg.tool_call_id;
(msg as any).toolCallId ?? (msg as any).tool_call_id; const meta =
const meta = inferToolMeta(msg) ??
inferToolMeta(msg) ?? (toolCallId ? toolMetaById.get(toolCallId) : undefined);
(toolCallId ? toolMetaById.get(toolCallId) : undefined);
emitAgentEvent({ emitAgentEvent({
runId, runId,
stream: "tool", stream: "tool",
data: { data: {
phase: "result", phase: "result",
name: toolName, name: toolName,
toolCallId, toolCallId,
meta, meta,
}, },
}); });
params.onAgentEvent?.({ params.onAgentEvent?.({
stream: "tool", stream: "tool",
data: { data: {
phase: "result", phase: "result",
name: toolName, name: toolName,
toolCallId, toolCallId,
meta, meta,
}, },
}); });
if (pendingToolName && toolName && toolName !== pendingToolName) { if (pendingToolName && toolName && toolName !== pendingToolName) {
flushPendingTool(); flushPendingTool();
} }
if (!pendingToolName) pendingToolName = toolName; if (!pendingToolName) pendingToolName = toolName;
if (meta) pendingMetas.push(meta); if (meta) pendingMetas.push(meta);
if ( if (
TOOL_RESULT_FLUSH_COUNT > 0 && TOOL_RESULT_FLUSH_COUNT > 0 &&
pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT
) { ) {
flushPendingTool(); flushPendingTool();
return; return;
} }
if (pendingTimer) clearTimeout(pendingTimer); if (pendingTimer) clearTimeout(pendingTimer);
pendingTimer = setTimeout( pendingTimer = setTimeout(
flushPendingTool, flushPendingTool,
TOOL_RESULT_DEBOUNCE_MS, TOOL_RESULT_DEBOUNCE_MS,
); );
} }
} }
if (ev.type === "message_end" && "message" in ev && ev.message && ev.message.role === "assistant") { if (
ev.type === "message_end" &&
"message" in ev &&
ev.message &&
ev.message.role === "assistant"
) {
streamAssistantFinal(ev.message as AssistantMessage); streamAssistantFinal(ev.message as AssistantMessage);
const text = extractRpcAssistantText(line); const text = extractRpcAssistantText(line);
if (text) { if (text) {
@@ -682,7 +694,11 @@ export async function runCommandReply(
} }
// Preserve existing partial reply hook when provided. // Preserve existing partial reply hook when provided.
if (onPartialReply && "message" in ev && ev.message?.role === "assistant") { if (
onPartialReply &&
"message" in ev &&
ev.message?.role === "assistant"
) {
// Let the existing logic reuse the already-parsed message. // Let the existing logic reuse the already-parsed message.
try { try {
streamAssistantFinal(ev.message as AssistantMessage); streamAssistantFinal(ev.message as AssistantMessage);

View File

@@ -9,7 +9,6 @@ import { createDefaultDeps } from "./cli/deps.js";
import { promptYesNo } from "./cli/prompt.js"; import { promptYesNo } from "./cli/prompt.js";
import { waitForever } from "./cli/wait.js"; import { waitForever } from "./cli/wait.js";
import { loadConfig } from "./config/config.js"; import { loadConfig } from "./config/config.js";
import { assertSupportedRuntime } from "./infra/runtime-guard.js";
import { import {
deriveSessionKey, deriveSessionKey,
loadSessionStore, loadSessionStore,
@@ -24,6 +23,7 @@ import {
handlePortError, handlePortError,
PortInUseError, PortInUseError,
} from "./infra/ports.js"; } from "./infra/ports.js";
import { assertSupportedRuntime } from "./infra/runtime-guard.js";
import { enableConsoleCapture } from "./logging.js"; import { enableConsoleCapture } from "./logging.js";
import { runCommandWithTimeout, runExec } from "./process/exec.js"; import { runCommandWithTimeout, runExec } from "./process/exec.js";
import { monitorWebProvider } from "./provider-web.js"; import { monitorWebProvider } from "./provider-web.js";

View File

@@ -5,8 +5,8 @@ import {
detectRuntime, detectRuntime,
isAtLeast, isAtLeast,
parseSemver, parseSemver,
runtimeSatisfies,
type RuntimeDetails, type RuntimeDetails,
runtimeSatisfies,
} from "./runtime-guard.js"; } from "./runtime-guard.js";
describe("runtime-guard", () => { describe("runtime-guard", () => {
@@ -17,9 +17,24 @@ describe("runtime-guard", () => {
}); });
it("compares versions correctly", () => { it("compares versions correctly", () => {
expect(isAtLeast({ major: 22, minor: 0, patch: 0 }, { major: 22, minor: 0, patch: 0 })).toBe(true); expect(
expect(isAtLeast({ major: 22, minor: 1, patch: 0 }, { major: 22, minor: 0, patch: 0 })).toBe(true); isAtLeast(
expect(isAtLeast({ major: 21, minor: 9, patch: 0 }, { major: 22, minor: 0, patch: 0 })).toBe(false); { major: 22, minor: 0, patch: 0 },
{ major: 22, minor: 0, patch: 0 },
),
).toBe(true);
expect(
isAtLeast(
{ major: 22, minor: 1, patch: 0 },
{ major: 22, minor: 0, patch: 0 },
),
).toBe(true);
expect(
isAtLeast(
{ major: 21, minor: 9, patch: 0 },
{ major: 22, minor: 0, patch: 0 },
),
).toBe(false);
}); });
it("validates runtime thresholds", () => { it("validates runtime thresholds", () => {
@@ -49,7 +64,9 @@ describe("runtime-guard", () => {
pathEnv: "/usr/bin", pathEnv: "/usr/bin",
}; };
expect(() => assertSupportedRuntime(runtime, details)).toThrow("exit"); expect(() => assertSupportedRuntime(runtime, details)).toThrow("exit");
expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("requires Node")); expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("requires Node"),
);
}); });
it("returns silently when runtime meets requirements", () => { it("returns silently when runtime meets requirements", () => {
@@ -68,4 +85,3 @@ describe("runtime-guard", () => {
expect(runtime.exit).not.toHaveBeenCalled(); expect(runtime.exit).not.toHaveBeenCalled();
}); });
}); });

View File

@@ -43,10 +43,16 @@ export function isAtLeast(version: Semver | null, minimum: Semver): boolean {
export function detectRuntime(): RuntimeDetails { export function detectRuntime(): RuntimeDetails {
const isBun = Boolean(process.versions?.bun); const isBun = Boolean(process.versions?.bun);
const kind: RuntimeKind = isBun ? "bun" : process.versions?.node ? "node" : "unknown"; const kind: RuntimeKind = isBun
? "bun"
: process.versions?.node
? "node"
: "unknown";
const bunVersion =
(globalThis as { Bun?: { version?: string } })?.Bun?.version ?? null;
const version = isBun const version = isBun
? process.versions?.bun ?? (globalThis as any)?.Bun?.version ?? null ? (process.versions?.bun ?? bunVersion)
: process.versions?.node ?? null; : (process.versions?.node ?? null);
return { return {
kind, kind,
@@ -70,7 +76,10 @@ export function assertSupportedRuntime(
if (runtimeSatisfies(details)) return; if (runtimeSatisfies(details)) return;
const versionLabel = details.version ?? "unknown"; const versionLabel = details.version ?? "unknown";
const runtimeLabel = details.kind === "unknown" ? "unknown runtime" : `${details.kind} ${versionLabel}`; const runtimeLabel =
details.kind === "unknown"
? "unknown runtime"
: `${details.kind} ${versionLabel}`;
const execLabel = details.execPath ?? "unknown"; const execLabel = details.execPath ?? "unknown";
runtime.error( runtime.error(
@@ -87,4 +96,3 @@ export function assertSupportedRuntime(
); );
runtime.exit(1); runtime.exit(1);
} }

View File

@@ -823,31 +823,6 @@ export async function monitorWebProvider(
}, },
{ {
onReplyStart: latest.sendComposing, onReplyStart: latest.sendComposing,
onPartialReply: async (partial) => {
try {
await deliverWebReply({
replyResult: partial,
msg: latest,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
if (partial.text) {
recentlySent.add(partial.text);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
} catch (err) {
console.error(
danger(
`Failed sending partial web auto-reply to ${latest.from ?? conversationId}: ${String(err)}`,
),
);
}
},
}, },
); );