From b6c45485bc1f170478d27e929f6932bebc022615 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 3 Dec 2025 00:25:01 +0000 Subject: [PATCH] Auto-reply: smarter chunking breaks --- CHANGELOG.md | 11 ++++++ src/agents/agents.test.ts | 6 ++-- src/agents/pi.ts | 38 +++++++++++---------- src/agents/types.ts | 1 + src/auto-reply/chunk.test.ts | 46 +++++++++++++++++++++++++ src/auto-reply/chunk.ts | 48 ++++++++++++++++++++++++++ src/auto-reply/command-reply.ts | 8 ++--- src/auto-reply/reply.chunking.test.ts | 49 +++++++++++++++++++++++++++ src/auto-reply/reply.ts | 25 +++++++++----- src/process/tau-rpc.ts | 36 ++++++++++++++++---- src/web/auto-reply.ts | 21 ++++++------ 11 files changed, 239 insertions(+), 50 deletions(-) create mode 100644 src/auto-reply/chunk.test.ts create mode 100644 src/auto-reply/chunk.ts create mode 100644 src/auto-reply/reply.chunking.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c1c5fab7..486caeb29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog +## 1.3.2 — 2025-12-03 + +### Bug Fixes +- Tau/Pi RPC replies are now buffered until the assistant turn finishes and only completed assistant `message_end` events are emitted, preventing duplicate or partial WhatsApp messages. +- Command auto-replies return the parsed assistant texts array only (no deprecated `text` field), while preserving single-payload callers and keeping multi-message replies intact. +- WhatsApp Web auto-replies now fall back to sending the caption text if media delivery fails, so users still see a reply instead of silence. +- Outbound chunking now prefers newlines and word boundaries and only splits when exceeding platform limits, keeping multi-paragraph replies in a single message unless necessary. + +### Testing +- Updated agent and auto-reply parsers plus web media send fallbacks; test suite adjusted and now passing after the RPC/message handling refactors. + ## 1.3.1 — 2025-12-02 ### Security diff --git a/src/agents/agents.test.ts b/src/agents/agents.test.ts index 6f4605eb8..da40b2109 100644 --- a/src/agents/agents.test.ts +++ b/src/agents/agents.test.ts @@ -62,7 +62,7 @@ describe("agent buildArgs + parseOutput helpers", () => { '{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"hello world"}],"usage":{"input":10,"output":5},"model":"pi-1","provider":"inflection","stopReason":"end"}}', ].join("\n"); const parsed = piSpec.parseOutput(stdout); - expect(parsed.text).toBe("hello world"); + expect(parsed.texts?.[0]).toBe("hello world"); expect(parsed.meta?.provider).toBe("inflection"); expect((parsed.meta?.usage as { output?: number })?.output).toBe(5); }); @@ -73,7 +73,7 @@ describe("agent buildArgs + parseOutput helpers", () => { '{"type":"turn.completed","usage":{"input_tokens":50,"output_tokens":10,"cached_input_tokens":5}}', ].join("\n"); const parsed = codexSpec.parseOutput(stdout); - expect(parsed.text).toBe("hi there"); + expect(parsed.texts?.[0]).toBe("hi there"); const usage = parsed.meta?.usage as { input?: number; output?: number; @@ -93,7 +93,7 @@ describe("agent buildArgs + parseOutput helpers", () => { '{"type":"step_finish","timestamp":1200,"part":{"cost":0.002,"tokens":{"input":100,"output":20}}}', ].join("\n"); const parsed = opencodeSpec.parseOutput(stdout); - expect(parsed.text).toBe("hi"); + expect(parsed.texts?.[0]).toBe("hi"); expect(parsed.meta?.extra?.summary).toContain("duration=1200ms"); expect(parsed.meta?.extra?.summary).toContain("cost=$0.0020"); expect(parsed.meta?.extra?.summary).toContain("tokens=100+20"); diff --git a/src/agents/pi.ts b/src/agents/pi.ts index f1e632a77..a9a75ff07 100644 --- a/src/agents/pi.ts +++ b/src/agents/pi.ts @@ -14,11 +14,10 @@ type PiAssistantMessage = { function parsePiJson(raw: string): AgentParseResult { const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); - // Collect every assistant message we see; Tau in RPC mode can emit multiple - // assistant payloads in one run (e.g., queued turns, heartbeats). We concatenate - // all text blocks so users see everything instead of only the last message_end. + // Collect only completed assistant messages (skip streaming updates/toolcalls). const texts: string[] = []; let lastAssistant: PiAssistantMessage | undefined; + let lastPushed: string | undefined; for (const line of lines) { try { @@ -26,15 +25,24 @@ function parsePiJson(raw: string): AgentParseResult { type?: string; message?: PiAssistantMessage; }; - const msg = ev.message; - if (msg?.role === "assistant" && Array.isArray(msg.content)) { - const msgText = msg.content - .filter((c) => c?.type === "text" && typeof c.text === "string") - .map((c) => c.text) - .join("\n") - .trim(); - if (msgText) texts.push(msgText); - // keep meta from the most recent assistant message + + const isAssistantMessage = + (ev.type === "message" || ev.type === "message_end") && + ev.message?.role === "assistant" && + Array.isArray(ev.message.content); + + if (!isAssistantMessage) continue; + + const msg = ev.message as PiAssistantMessage; + const msgText = msg.content + ?.filter((c) => c?.type === "text" && typeof c.text === "string") + .map((c) => c.text) + .join("\n") + .trim(); + + if (msgText && msgText !== lastPushed) { + texts.push(msgText); + lastPushed = msgText; lastAssistant = msg; } } catch { @@ -42,12 +50,8 @@ function parsePiJson(raw: string): AgentParseResult { } } - // Combine all assistant text messages (ignore tool calls/partials). This keeps - // multi-message replies intact while dropping non-text events. - const text = texts.length ? texts.join("\n\n").trim() : undefined; - const meta: AgentMeta | undefined = - text && lastAssistant + lastAssistant && texts.length ? { model: lastAssistant.model, provider: lastAssistant.provider, diff --git a/src/agents/types.ts b/src/agents/types.ts index 5fccd7053..2c868847f 100644 --- a/src/agents/types.ts +++ b/src/agents/types.ts @@ -16,6 +16,7 @@ export type AgentMeta = { }; export type AgentParseResult = { + // Plural to support agents that emit multiple assistant turns per prompt. texts?: string[]; mediaUrls?: string[]; meta?: AgentMeta; diff --git a/src/auto-reply/chunk.test.ts b/src/auto-reply/chunk.test.ts new file mode 100644 index 000000000..d19cdf972 --- /dev/null +++ b/src/auto-reply/chunk.test.ts @@ -0,0 +1,46 @@ +import { describe, expect, it } from "vitest"; + +import { chunkText } from "./chunk.js"; + +describe("chunkText", () => { + it("keeps multi-line text in one chunk when under limit", () => { + const text = "Line one\n\nLine two\n\nLine three"; + const chunks = chunkText(text, 1600); + expect(chunks).toEqual([text]); + }); + + it("splits only when text exceeds the limit", () => { + const part = "a".repeat(20); + const text = part.repeat(5); // 100 chars + const chunks = chunkText(text, 60); + expect(chunks.length).toBe(2); + expect(chunks[0].length).toBe(60); + expect(chunks[1].length).toBe(40); + expect(chunks.join("")).toBe(text); + }); + + it("prefers breaking at a newline before the limit", () => { + const text = `paragraph one line\n\nparagraph two starts here and continues`; + const chunks = chunkText(text, 40); + expect(chunks).toEqual([ + "paragraph one line", + "paragraph two starts here and continues", + ]); + }); + + it("otherwise breaks at the last whitespace under the limit", () => { + const text = "This is a message that should break nicely near a word boundary."; + const chunks = chunkText(text, 30); + expect(chunks[0].length).toBeLessThanOrEqual(30); + expect(chunks[1].length).toBeLessThanOrEqual(30); + expect(chunks.join(" ").replace(/\s+/g, " ").trim()).toBe( + text.replace(/\s+/g, " ").trim(), + ); + }); + + it("falls back to a hard break when no whitespace is present", () => { + const text = "Supercalifragilisticexpialidocious"; // 34 chars + const chunks = chunkText(text, 10); + expect(chunks).toEqual(["Supercalif", "ragilistic", "expialidoc", "ious"]); + }); +}); diff --git a/src/auto-reply/chunk.ts b/src/auto-reply/chunk.ts new file mode 100644 index 000000000..90140742e --- /dev/null +++ b/src/auto-reply/chunk.ts @@ -0,0 +1,48 @@ +// Utilities for splitting outbound text into platform-sized chunks without +// unintentionally breaking on newlines. Using [\s\S] keeps newlines inside +// the chunk so messages are only split when they truly exceed the limit. + +export function chunkText(text: string, limit: number): string[] { + if (!text) return []; + if (limit <= 0) return [text]; + if (text.length <= limit) return [text]; + + const chunks: string[] = []; + let remaining = text; + + while (remaining.length > limit) { + const window = remaining.slice(0, limit); + + // 1) Prefer a newline break inside the window. + let breakIdx = window.lastIndexOf("\n"); + + // 2) Otherwise prefer the last whitespace (word boundary) inside the window. + if (breakIdx <= 0) { + for (let i = window.length - 1; i >= 0; i--) { + if (/\s/.test(window[i])) { + breakIdx = i; + break; + } + } + } + + // 3) Fallback: hard break exactly at the limit. + if (breakIdx <= 0) breakIdx = limit; + + const rawChunk = remaining.slice(0, breakIdx); + const chunk = rawChunk.trimEnd(); + if (chunk.length > 0) { + chunks.push(chunk); + } + + // If we broke on whitespace/newline, skip that separator; for hard breaks keep it. + const brokeOnSeparator = + breakIdx < remaining.length && /\s/.test(remaining[breakIdx]); + const nextStart = Math.min(remaining.length, breakIdx + (brokeOnSeparator ? 1 : 0)); + remaining = remaining.slice(nextStart).trimStart(); + } + + if (remaining.length) chunks.push(remaining); + + return chunks; +} diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index cf914ed6a..759931a4e 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -255,11 +255,11 @@ export async function runCommandReply( } const parsed = trimmed ? agent.parseOutput(trimmed) : undefined; + const parserProvided = !!parsed; // Collect one message per assistant text from parseOutput (tau RPC can emit many). const parsedTexts = - parsed?.texts?.map((t) => t.trim()).filter(Boolean) ?? - (parsed?.text ? [parsed.text.trim()] : []); + parsed?.texts?.map((t) => t.trim()).filter(Boolean) ?? []; type ReplyItem = { text: string; media?: string[] }; const replyItems: ReplyItem[] = []; @@ -274,7 +274,7 @@ export async function runCommandReply( } // If parser gave nothing, fall back to raw stdout as a single message. - if (replyItems.length === 0 && trimmed) { + if (replyItems.length === 0 && trimmed && !parserProvided) { const { text: cleanedText, mediaUrls: mediaFound } = splitMediaFromOutput(trimmed); if (cleanedText || mediaFound?.length) { @@ -401,7 +401,7 @@ export async function runCommandReply( } verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`); - return { payloads, meta }; + return { payloads, payload: payloads[0], meta }; } catch (err) { const elapsed = Date.now() - started; logger.info( diff --git a/src/auto-reply/reply.chunking.test.ts b/src/auto-reply/reply.chunking.test.ts new file mode 100644 index 000000000..bf3c98fef --- /dev/null +++ b/src/auto-reply/reply.chunking.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, it, vi } from "vitest"; + +import type { WarelayConfig } from "../config/config.js"; +import { autoReplyIfConfigured } from "./reply.js"; + +describe("autoReplyIfConfigured chunking", () => { + it("sends a single Twilio message for multi-line text under limit", async () => { + const body = [ + "Oh! Hi Peter! 🦞", + "", + "Sorry, I got a bit trigger-happy with the heartbeat response there. What's up?", + "", + "Everything working on your end?", + ].join("\n"); + + const config: WarelayConfig = { + inbound: { + reply: { + mode: "text", + text: body, + }, + }, + }; + + const create = vi.fn().mockResolvedValue({}); + const client = { messages: { create } } as unknown as Parameters< + typeof autoReplyIfConfigured + >[0]; + + const message = { + body: "ping", + from: "+15551234567", + to: "+15557654321", + sid: "SM123", + } as Parameters[1]; + + await autoReplyIfConfigured(client, message, config); + + expect(create).toHaveBeenCalledTimes(1); + expect(create).toHaveBeenCalledWith( + expect.objectContaining({ + body, + from: message.to, + to: message.from, + }), + ); + }); +}); + diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 4889887ec..843a7254d 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -1,5 +1,4 @@ import crypto from "node:crypto"; - import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js"; import { loadConfig, type WarelayConfig } from "../config/config.js"; import { @@ -18,6 +17,7 @@ import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import type { TwilioRequester } from "../twilio/types.js"; import { sendTypingIndicator } from "../twilio/typing.js"; import { runCommandReply } from "./command-reply.js"; +import { chunkText } from "./chunk.js"; import { applyTemplate, type MsgContext, @@ -307,7 +307,7 @@ export async function getReplyFromConfig( mediaUrl: reply.mediaUrl, }; cleanupTyping(); - return [result]; + return result; } if (reply && reply.mode === "command" && reply.command?.length) { @@ -318,7 +318,7 @@ export async function getReplyFromConfig( mode: "command" as const, }; try { - const { payloads, meta } = await runCommandReply({ + const runResult = await runCommandReply({ reply: commandReply, templatingCtx, sendSystemOnce, @@ -329,6 +329,17 @@ export async function getReplyFromConfig( timeoutSeconds, commandRunner, }); + const payloadArray = + runResult.payloads ?? (runResult.payload ? [runResult.payload] : []); + const meta = runResult.meta; + const normalizedPayloads = + payloadArray.length === 1 ? payloadArray[0] : payloadArray; + if ( + !normalizedPayloads || + (Array.isArray(normalizedPayloads) && normalizedPayloads.length === 0) + ) { + return undefined; + } if (sessionCfg && sessionStore && sessionKey) { const returnedSessionId = meta.agentMeta?.sessionId; if (returnedSessionId && returnedSessionId !== sessionId) { @@ -357,7 +368,7 @@ export async function getReplyFromConfig( if (meta.agentMeta && isVerbose()) { logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`); } - return payloads; + return normalizedPayloads; } finally { cleanupTyping(); } @@ -459,10 +470,8 @@ export async function autoReplyIfConfigured( : []; const text = replyPayload.text ?? ""; - const chunks = - text.length > 0 - ? (text.match(new RegExp(`.{1,${TWILIO_TEXT_LIMIT}}`, "g")) ?? []) - : [""]; + const chunks = chunkText(text, TWILIO_TEXT_LIMIT); + if (chunks.length === 0) chunks.push(""); for (let i = 0; i < chunks.length; i++) { const body = chunks[i]; diff --git a/src/process/tau-rpc.ts b/src/process/tau-rpc.ts index 5c8730539..83b44c416 100644 --- a/src/process/tau-rpc.ts +++ b/src/process/tau-rpc.ts @@ -1,6 +1,8 @@ import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import readline from "node:readline"; +import { piSpec } from "../agents/pi.js"; + type TauRpcOptions = { argv: string[]; cwd?: string; @@ -20,6 +22,9 @@ class TauRpcClient { private rl: readline.Interface | null = null; private stderr = ""; private buffer: string[] = []; + private idleTimer: NodeJS.Timeout | null = null; + private seenAssistantEnd = false; + private readonly idleMs = 120; private pending: | { resolve: (r: TauRpcResult) => void; @@ -59,17 +64,34 @@ class TauRpcClient { private handleLine(line: string) { if (!this.pending) return; this.buffer.push(line); - // Finish on assistant message_end event to mirror parse logic in piSpec + // Streamed JSON arrives line-by-line; mark when an assistant message finishes + // and resolve after a short idle to capture any follow-up events (e.g. tools) + // that belong to the same turn. if ( line.includes('"type":"message_end"') && line.includes('"role":"assistant"') ) { - const out = this.buffer.join("\n"); - clearTimeout(this.pending.timer); - const pending = this.pending; - this.pending = undefined; - this.buffer = []; - pending.resolve({ stdout: out, stderr: this.stderr, code: 0 }); + this.seenAssistantEnd = true; + } + + if (this.seenAssistantEnd) { + if (this.idleTimer) clearTimeout(this.idleTimer); + this.idleTimer = setTimeout(() => { + if (!this.pending) return; + const out = this.buffer.join("\n"); + // Only resolve once we have at least one assistant text payload; otherwise keep waiting. + const parsed = piSpec.parseOutput(out); + if (parsed.texts && parsed.texts.length > 0) { + const pending = this.pending; + this.pending = undefined; + this.buffer = []; + this.seenAssistantEnd = false; + clearTimeout(pending.timer); + pending.resolve({ stdout: out, stderr: this.stderr, code: 0 }); + return; + } + // No assistant text yet; wait for more lines. + }, this.idleMs); // small idle window to group streaming blocks } } diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 7556a7954..3c0d323e1 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -1,3 +1,4 @@ +import { chunkText } from "../auto-reply/chunk.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { waitForever } from "../cli/wait.js"; @@ -373,12 +374,7 @@ async function deliverWebReply(params: { skipLog, } = params; const replyStarted = Date.now(); - const textChunks = - (replyResult.text || "").length > 0 - ? ((replyResult.text || "").match( - new RegExp(`.{1,${WEB_TEXT_LIMIT}}`, "g"), - ) ?? []) - : []; + const textChunks = chunkText(replyResult.text || "", WEB_TEXT_LIMIT); const mediaList = replyResult.mediaUrls?.length ? replyResult.mediaUrls : replyResult.mediaUrl @@ -417,6 +413,8 @@ async function deliverWebReply(params: { // Media (with optional caption on first item) for (const [index, mediaUrl] of mediaList.entries()) { + const caption = + index === 0 ? remainingText.shift() || undefined : undefined; try { const media = await loadWebMedia(mediaUrl, maxMediaBytes); if (isVerbose()) { @@ -427,8 +425,6 @@ async function deliverWebReply(params: { `Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`, ); } - const caption = - index === 0 ? remainingText.shift() || undefined : undefined; if (media.kind === "image") { await msg.sendMedia({ image: media.buffer, @@ -481,9 +477,12 @@ async function deliverWebReply(params: { danger(`Failed sending web media to ${msg.from}: ${String(err)}`), ); replyLogger.warn({ err, mediaUrl }, "failed to send web media reply"); - if (index === 0 && remainingText.length) { - console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`); - await msg.reply(remainingText.shift() || ""); + if (index === 0) { + const fallbackText = remainingText.shift() ?? caption ?? ""; + if (fallbackText) { + console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`); + await msg.reply(fallbackText); + } } } }