From 5bfecc6152150018dbd3692caceed631bec68664 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 9 Dec 2025 01:40:51 +0100 Subject: [PATCH] fix: stop partial replies for whatsapp/telegram surfaces --- src/auto-reply/command-reply.test.ts | 6 +- src/auto-reply/command-reply.ts | 116 +++++++++++++++------------ src/index.ts | 2 +- src/infra/runtime-guard.test.ts | 28 +++++-- src/infra/runtime-guard.ts | 18 +++-- src/web/auto-reply.ts | 25 ------ 6 files changed, 107 insertions(+), 88 deletions(-) diff --git a/src/auto-reply/command-reply.test.ts b/src/auto-reply/command-reply.test.ts index c6fb0b59e..a7811ed30 100644 --- a/src/auto-reply/command-reply.test.ts +++ b/src/auto-reply/command-reply.test.ts @@ -247,7 +247,11 @@ describe("runCommandReply (pi)", () => { expect(events).toContainEqual({ stream: "tool", - data: expect.objectContaining({ phase: "start", name: "bash", toolCallId: "call-1" }), + data: expect.objectContaining({ + phase: "start", + name: "bash", + toolCallId: "call-1", + }), }); expect(events).toContainEqual({ stream: "tool", diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 11cb2b7a3..ef119cbb2 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -1,7 +1,11 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; - +import type { + AgentEvent, + AssistantMessage, + Message, +} from "@mariozechner/pi-ai"; import { piSpec } from "../agents/pi.js"; import type { AgentMeta, AgentToolResult } from "../agents/types.js"; import type { WarelayConfig } from "../config/config.js"; @@ -13,7 +17,6 @@ import { splitMediaFromOutput } from "../media/parse.js"; import { enqueueCommand } from "../process/command-queue.js"; import type { runCommandWithTimeout } from "../process/exec.js"; import { runPiRpc } from "../process/tau-rpc.js"; -import type { AgentEvent, AssistantMessage, Message } from "@mariozechner/pi-ai"; import { applyTemplate, type TemplateContext } from "./templating.js"; import { formatToolAggregate, @@ -614,63 +617,72 @@ export async function runCommandReply( } if ( - ("message" in ev && ev.message) && + "message" in ev && + ev.message && (ev.type === "message" || ev.type === "message_end") ) { - const msg = ev.message as Message; - const role = (msg as any).role; - const isToolResult = role === "toolResult" || role === "tool_result"; + const msg = ev.message as Message & { + toolCallId?: string; + tool_call_id?: string; + }; + const role = msg.role; + const isToolResult = + role === "toolResult" || role === "tool_result"; if (!isToolResult || !Array.isArray(msg.content)) { // not a tool result message we care about } else { - const toolName = inferToolName(msg); - const toolCallId = - (msg as any).toolCallId ?? (msg as any).tool_call_id; - const meta = - inferToolMeta(msg) ?? - (toolCallId ? toolMetaById.get(toolCallId) : undefined); + const toolName = inferToolName(msg); + const toolCallId = msg.toolCallId ?? msg.tool_call_id; + const meta = + inferToolMeta(msg) ?? + (toolCallId ? toolMetaById.get(toolCallId) : undefined); - emitAgentEvent({ - runId, - stream: "tool", - data: { - phase: "result", - name: toolName, - toolCallId, - meta, - }, - }); - params.onAgentEvent?.({ - stream: "tool", - data: { - phase: "result", - name: toolName, - toolCallId, - meta, - }, - }); + emitAgentEvent({ + runId, + stream: "tool", + data: { + phase: "result", + name: toolName, + toolCallId, + meta, + }, + }); + params.onAgentEvent?.({ + stream: "tool", + data: { + phase: "result", + name: toolName, + toolCallId, + meta, + }, + }); - if (pendingToolName && toolName && toolName !== pendingToolName) { - flushPendingTool(); - } - if (!pendingToolName) pendingToolName = toolName; - if (meta) pendingMetas.push(meta); - if ( - TOOL_RESULT_FLUSH_COUNT > 0 && - pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT - ) { - flushPendingTool(); - return; - } - if (pendingTimer) clearTimeout(pendingTimer); - pendingTimer = setTimeout( - flushPendingTool, - TOOL_RESULT_DEBOUNCE_MS, - ); + if (pendingToolName && toolName && toolName !== pendingToolName) { + flushPendingTool(); + } + if (!pendingToolName) pendingToolName = toolName; + if (meta) pendingMetas.push(meta); + if ( + TOOL_RESULT_FLUSH_COUNT > 0 && + pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT + ) { + flushPendingTool(); + return; + } + if (pendingTimer) clearTimeout(pendingTimer); + pendingTimer = setTimeout( + flushPendingTool, + TOOL_RESULT_DEBOUNCE_MS, + ); } } - if (ev.type === "message_end" && "message" in ev && ev.message && ev.message.role === "assistant") { + if ( + ev.type === "message_end" && + "message" in ev && + ev.message && + ev.message.role === "assistant" + ) { streamAssistantFinal(ev.message as AssistantMessage); const text = extractRpcAssistantText(line); if (text) { @@ -682,7 +694,11 @@ export async function runCommandReply( } // Preserve existing partial reply hook when provided. - if (onPartialReply && "message" in ev && ev.message?.role === "assistant") { + if ( + onPartialReply && + "message" in ev && + ev.message?.role === "assistant" + ) { // Let the existing logic reuse the already-parsed message. try { streamAssistantFinal(ev.message as AssistantMessage); diff --git a/src/index.ts b/src/index.ts index 65d7ec932..3b7ad6362 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,7 +9,6 @@ import { createDefaultDeps } from "./cli/deps.js"; import { promptYesNo } from "./cli/prompt.js"; import { waitForever } from "./cli/wait.js"; import { loadConfig } from "./config/config.js"; -import { assertSupportedRuntime } from "./infra/runtime-guard.js"; import { deriveSessionKey, loadSessionStore, @@ -24,6 +23,7 @@ import { handlePortError, PortInUseError, } from "./infra/ports.js"; +import { assertSupportedRuntime } from "./infra/runtime-guard.js"; import { enableConsoleCapture } from "./logging.js"; import { runCommandWithTimeout, runExec } from "./process/exec.js"; import { monitorWebProvider } from "./provider-web.js"; diff --git a/src/infra/runtime-guard.test.ts b/src/infra/runtime-guard.test.ts index f34b53ac2..9343d77dd 100644 --- a/src/infra/runtime-guard.test.ts +++ b/src/infra/runtime-guard.test.ts @@ -5,8 +5,8 @@ import { detectRuntime, isAtLeast, parseSemver, - runtimeSatisfies, type RuntimeDetails, + runtimeSatisfies, } from "./runtime-guard.js"; describe("runtime-guard", () => { @@ -17,9 +17,24 @@ describe("runtime-guard", () => { }); it("compares versions correctly", () => { - expect(isAtLeast({ major: 22, minor: 0, patch: 0 }, { major: 22, minor: 0, patch: 0 })).toBe(true); - expect(isAtLeast({ major: 22, minor: 1, patch: 0 }, { major: 22, minor: 0, patch: 0 })).toBe(true); - expect(isAtLeast({ major: 21, minor: 9, patch: 0 }, { major: 22, minor: 0, patch: 0 })).toBe(false); + expect( + isAtLeast( + { major: 22, minor: 0, patch: 0 }, + { major: 22, minor: 0, patch: 0 }, + ), + ).toBe(true); + expect( + isAtLeast( + { major: 22, minor: 1, patch: 0 }, + { major: 22, minor: 0, patch: 0 }, + ), + ).toBe(true); + expect( + isAtLeast( + { major: 21, minor: 9, patch: 0 }, + { major: 22, minor: 0, patch: 0 }, + ), + ).toBe(false); }); it("validates runtime thresholds", () => { @@ -49,7 +64,9 @@ describe("runtime-guard", () => { pathEnv: "/usr/bin", }; expect(() => assertSupportedRuntime(runtime, details)).toThrow("exit"); - expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("requires Node")); + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining("requires Node"), + ); }); it("returns silently when runtime meets requirements", () => { @@ -68,4 +85,3 @@ describe("runtime-guard", () => { expect(runtime.exit).not.toHaveBeenCalled(); }); }); - diff --git a/src/infra/runtime-guard.ts b/src/infra/runtime-guard.ts index 167fda868..e7383c83b 100644 --- a/src/infra/runtime-guard.ts +++ b/src/infra/runtime-guard.ts @@ -43,10 +43,16 @@ export function isAtLeast(version: Semver | null, minimum: Semver): boolean { export function detectRuntime(): RuntimeDetails { const isBun = Boolean(process.versions?.bun); - const kind: RuntimeKind = isBun ? "bun" : process.versions?.node ? "node" : "unknown"; + const kind: RuntimeKind = isBun + ? "bun" + : process.versions?.node + ? "node" + : "unknown"; + const bunVersion = + (globalThis as { Bun?: { version?: string } })?.Bun?.version ?? null; const version = isBun - ? process.versions?.bun ?? (globalThis as any)?.Bun?.version ?? null - : process.versions?.node ?? null; + ? (process.versions?.bun ?? bunVersion) + : (process.versions?.node ?? null); return { kind, @@ -70,7 +76,10 @@ export function assertSupportedRuntime( if (runtimeSatisfies(details)) return; const versionLabel = details.version ?? "unknown"; - const runtimeLabel = details.kind === "unknown" ? "unknown runtime" : `${details.kind} ${versionLabel}`; + const runtimeLabel = + details.kind === "unknown" + ? "unknown runtime" + : `${details.kind} ${versionLabel}`; const execLabel = details.execPath ?? "unknown"; runtime.error( @@ -87,4 +96,3 @@ export function assertSupportedRuntime( ); runtime.exit(1); } - diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 90d40cb09..292298ad0 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -823,31 +823,6 @@ export async function monitorWebProvider( }, { onReplyStart: latest.sendComposing, - onPartialReply: async (partial) => { - try { - await deliverWebReply({ - replyResult: partial, - msg: latest, - maxMediaBytes, - replyLogger, - runtime, - connectionId, - }); - if (partial.text) { - recentlySent.add(partial.text); - if (recentlySent.size > MAX_RECENT_MESSAGES) { - const firstKey = recentlySent.values().next().value; - if (firstKey) recentlySent.delete(firstKey); - } - } - } catch (err) { - console.error( - danger( - `Failed sending partial web auto-reply to ${latest.from ?? conversationId}: ${String(err)}`, - ), - ); - } - }, }, );