Auto-reply: smarter chunking breaks
This commit is contained in:
11
CHANGELOG.md
11
CHANGELOG.md
@@ -1,5 +1,16 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 1.3.2 — 2025-12-03
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
- Tau/Pi RPC replies are now buffered until the assistant turn finishes and only completed assistant `message_end` events are emitted, preventing duplicate or partial WhatsApp messages.
|
||||||
|
- Command auto-replies return the parsed assistant texts array only (no deprecated `text` field), while preserving single-payload callers and keeping multi-message replies intact.
|
||||||
|
- WhatsApp Web auto-replies now fall back to sending the caption text if media delivery fails, so users still see a reply instead of silence.
|
||||||
|
- Outbound chunking now prefers newlines and word boundaries and only splits when exceeding platform limits, keeping multi-paragraph replies in a single message unless necessary.
|
||||||
|
|
||||||
|
### Testing
|
||||||
|
- Updated agent and auto-reply parsers plus web media send fallbacks; test suite adjusted and now passing after the RPC/message handling refactors.
|
||||||
|
|
||||||
## 1.3.1 — 2025-12-02
|
## 1.3.1 — 2025-12-02
|
||||||
|
|
||||||
### Security
|
### Security
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ describe("agent buildArgs + parseOutput helpers", () => {
|
|||||||
'{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"hello world"}],"usage":{"input":10,"output":5},"model":"pi-1","provider":"inflection","stopReason":"end"}}',
|
'{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"hello world"}],"usage":{"input":10,"output":5},"model":"pi-1","provider":"inflection","stopReason":"end"}}',
|
||||||
].join("\n");
|
].join("\n");
|
||||||
const parsed = piSpec.parseOutput(stdout);
|
const parsed = piSpec.parseOutput(stdout);
|
||||||
expect(parsed.text).toBe("hello world");
|
expect(parsed.texts?.[0]).toBe("hello world");
|
||||||
expect(parsed.meta?.provider).toBe("inflection");
|
expect(parsed.meta?.provider).toBe("inflection");
|
||||||
expect((parsed.meta?.usage as { output?: number })?.output).toBe(5);
|
expect((parsed.meta?.usage as { output?: number })?.output).toBe(5);
|
||||||
});
|
});
|
||||||
@@ -73,7 +73,7 @@ describe("agent buildArgs + parseOutput helpers", () => {
|
|||||||
'{"type":"turn.completed","usage":{"input_tokens":50,"output_tokens":10,"cached_input_tokens":5}}',
|
'{"type":"turn.completed","usage":{"input_tokens":50,"output_tokens":10,"cached_input_tokens":5}}',
|
||||||
].join("\n");
|
].join("\n");
|
||||||
const parsed = codexSpec.parseOutput(stdout);
|
const parsed = codexSpec.parseOutput(stdout);
|
||||||
expect(parsed.text).toBe("hi there");
|
expect(parsed.texts?.[0]).toBe("hi there");
|
||||||
const usage = parsed.meta?.usage as {
|
const usage = parsed.meta?.usage as {
|
||||||
input?: number;
|
input?: number;
|
||||||
output?: number;
|
output?: number;
|
||||||
@@ -93,7 +93,7 @@ describe("agent buildArgs + parseOutput helpers", () => {
|
|||||||
'{"type":"step_finish","timestamp":1200,"part":{"cost":0.002,"tokens":{"input":100,"output":20}}}',
|
'{"type":"step_finish","timestamp":1200,"part":{"cost":0.002,"tokens":{"input":100,"output":20}}}',
|
||||||
].join("\n");
|
].join("\n");
|
||||||
const parsed = opencodeSpec.parseOutput(stdout);
|
const parsed = opencodeSpec.parseOutput(stdout);
|
||||||
expect(parsed.text).toBe("hi");
|
expect(parsed.texts?.[0]).toBe("hi");
|
||||||
expect(parsed.meta?.extra?.summary).toContain("duration=1200ms");
|
expect(parsed.meta?.extra?.summary).toContain("duration=1200ms");
|
||||||
expect(parsed.meta?.extra?.summary).toContain("cost=$0.0020");
|
expect(parsed.meta?.extra?.summary).toContain("cost=$0.0020");
|
||||||
expect(parsed.meta?.extra?.summary).toContain("tokens=100+20");
|
expect(parsed.meta?.extra?.summary).toContain("tokens=100+20");
|
||||||
|
|||||||
@@ -14,11 +14,10 @@ type PiAssistantMessage = {
|
|||||||
function parsePiJson(raw: string): AgentParseResult {
|
function parsePiJson(raw: string): AgentParseResult {
|
||||||
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
|
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
|
||||||
|
|
||||||
// Collect every assistant message we see; Tau in RPC mode can emit multiple
|
// Collect only completed assistant messages (skip streaming updates/toolcalls).
|
||||||
// assistant payloads in one run (e.g., queued turns, heartbeats). We concatenate
|
|
||||||
// all text blocks so users see everything instead of only the last message_end.
|
|
||||||
const texts: string[] = [];
|
const texts: string[] = [];
|
||||||
let lastAssistant: PiAssistantMessage | undefined;
|
let lastAssistant: PiAssistantMessage | undefined;
|
||||||
|
let lastPushed: string | undefined;
|
||||||
|
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
try {
|
try {
|
||||||
@@ -26,15 +25,24 @@ function parsePiJson(raw: string): AgentParseResult {
|
|||||||
type?: string;
|
type?: string;
|
||||||
message?: PiAssistantMessage;
|
message?: PiAssistantMessage;
|
||||||
};
|
};
|
||||||
const msg = ev.message;
|
|
||||||
if (msg?.role === "assistant" && Array.isArray(msg.content)) {
|
const isAssistantMessage =
|
||||||
const msgText = msg.content
|
(ev.type === "message" || ev.type === "message_end") &&
|
||||||
.filter((c) => c?.type === "text" && typeof c.text === "string")
|
ev.message?.role === "assistant" &&
|
||||||
.map((c) => c.text)
|
Array.isArray(ev.message.content);
|
||||||
.join("\n")
|
|
||||||
.trim();
|
if (!isAssistantMessage) continue;
|
||||||
if (msgText) texts.push(msgText);
|
|
||||||
// keep meta from the most recent assistant message
|
const msg = ev.message as PiAssistantMessage;
|
||||||
|
const msgText = msg.content
|
||||||
|
?.filter((c) => c?.type === "text" && typeof c.text === "string")
|
||||||
|
.map((c) => c.text)
|
||||||
|
.join("\n")
|
||||||
|
.trim();
|
||||||
|
|
||||||
|
if (msgText && msgText !== lastPushed) {
|
||||||
|
texts.push(msgText);
|
||||||
|
lastPushed = msgText;
|
||||||
lastAssistant = msg;
|
lastAssistant = msg;
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
@@ -42,12 +50,8 @@ function parsePiJson(raw: string): AgentParseResult {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Combine all assistant text messages (ignore tool calls/partials). This keeps
|
|
||||||
// multi-message replies intact while dropping non-text events.
|
|
||||||
const text = texts.length ? texts.join("\n\n").trim() : undefined;
|
|
||||||
|
|
||||||
const meta: AgentMeta | undefined =
|
const meta: AgentMeta | undefined =
|
||||||
text && lastAssistant
|
lastAssistant && texts.length
|
||||||
? {
|
? {
|
||||||
model: lastAssistant.model,
|
model: lastAssistant.model,
|
||||||
provider: lastAssistant.provider,
|
provider: lastAssistant.provider,
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ export type AgentMeta = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
export type AgentParseResult = {
|
export type AgentParseResult = {
|
||||||
|
// Plural to support agents that emit multiple assistant turns per prompt.
|
||||||
texts?: string[];
|
texts?: string[];
|
||||||
mediaUrls?: string[];
|
mediaUrls?: string[];
|
||||||
meta?: AgentMeta;
|
meta?: AgentMeta;
|
||||||
|
|||||||
46
src/auto-reply/chunk.test.ts
Normal file
46
src/auto-reply/chunk.test.ts
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
|
||||||
|
import { chunkText } from "./chunk.js";
|
||||||
|
|
||||||
|
describe("chunkText", () => {
|
||||||
|
it("keeps multi-line text in one chunk when under limit", () => {
|
||||||
|
const text = "Line one\n\nLine two\n\nLine three";
|
||||||
|
const chunks = chunkText(text, 1600);
|
||||||
|
expect(chunks).toEqual([text]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("splits only when text exceeds the limit", () => {
|
||||||
|
const part = "a".repeat(20);
|
||||||
|
const text = part.repeat(5); // 100 chars
|
||||||
|
const chunks = chunkText(text, 60);
|
||||||
|
expect(chunks.length).toBe(2);
|
||||||
|
expect(chunks[0].length).toBe(60);
|
||||||
|
expect(chunks[1].length).toBe(40);
|
||||||
|
expect(chunks.join("")).toBe(text);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("prefers breaking at a newline before the limit", () => {
|
||||||
|
const text = `paragraph one line\n\nparagraph two starts here and continues`;
|
||||||
|
const chunks = chunkText(text, 40);
|
||||||
|
expect(chunks).toEqual([
|
||||||
|
"paragraph one line",
|
||||||
|
"paragraph two starts here and continues",
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("otherwise breaks at the last whitespace under the limit", () => {
|
||||||
|
const text = "This is a message that should break nicely near a word boundary.";
|
||||||
|
const chunks = chunkText(text, 30);
|
||||||
|
expect(chunks[0].length).toBeLessThanOrEqual(30);
|
||||||
|
expect(chunks[1].length).toBeLessThanOrEqual(30);
|
||||||
|
expect(chunks.join(" ").replace(/\s+/g, " ").trim()).toBe(
|
||||||
|
text.replace(/\s+/g, " ").trim(),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("falls back to a hard break when no whitespace is present", () => {
|
||||||
|
const text = "Supercalifragilisticexpialidocious"; // 34 chars
|
||||||
|
const chunks = chunkText(text, 10);
|
||||||
|
expect(chunks).toEqual(["Supercalif", "ragilistic", "expialidoc", "ious"]);
|
||||||
|
});
|
||||||
|
});
|
||||||
48
src/auto-reply/chunk.ts
Normal file
48
src/auto-reply/chunk.ts
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
// Utilities for splitting outbound text into platform-sized chunks without
|
||||||
|
// unintentionally breaking on newlines. Using [\s\S] keeps newlines inside
|
||||||
|
// the chunk so messages are only split when they truly exceed the limit.
|
||||||
|
|
||||||
|
export function chunkText(text: string, limit: number): string[] {
|
||||||
|
if (!text) return [];
|
||||||
|
if (limit <= 0) return [text];
|
||||||
|
if (text.length <= limit) return [text];
|
||||||
|
|
||||||
|
const chunks: string[] = [];
|
||||||
|
let remaining = text;
|
||||||
|
|
||||||
|
while (remaining.length > limit) {
|
||||||
|
const window = remaining.slice(0, limit);
|
||||||
|
|
||||||
|
// 1) Prefer a newline break inside the window.
|
||||||
|
let breakIdx = window.lastIndexOf("\n");
|
||||||
|
|
||||||
|
// 2) Otherwise prefer the last whitespace (word boundary) inside the window.
|
||||||
|
if (breakIdx <= 0) {
|
||||||
|
for (let i = window.length - 1; i >= 0; i--) {
|
||||||
|
if (/\s/.test(window[i])) {
|
||||||
|
breakIdx = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3) Fallback: hard break exactly at the limit.
|
||||||
|
if (breakIdx <= 0) breakIdx = limit;
|
||||||
|
|
||||||
|
const rawChunk = remaining.slice(0, breakIdx);
|
||||||
|
const chunk = rawChunk.trimEnd();
|
||||||
|
if (chunk.length > 0) {
|
||||||
|
chunks.push(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we broke on whitespace/newline, skip that separator; for hard breaks keep it.
|
||||||
|
const brokeOnSeparator =
|
||||||
|
breakIdx < remaining.length && /\s/.test(remaining[breakIdx]);
|
||||||
|
const nextStart = Math.min(remaining.length, breakIdx + (brokeOnSeparator ? 1 : 0));
|
||||||
|
remaining = remaining.slice(nextStart).trimStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (remaining.length) chunks.push(remaining);
|
||||||
|
|
||||||
|
return chunks;
|
||||||
|
}
|
||||||
@@ -255,11 +255,11 @@ export async function runCommandReply(
|
|||||||
}
|
}
|
||||||
|
|
||||||
const parsed = trimmed ? agent.parseOutput(trimmed) : undefined;
|
const parsed = trimmed ? agent.parseOutput(trimmed) : undefined;
|
||||||
|
const parserProvided = !!parsed;
|
||||||
|
|
||||||
// Collect one message per assistant text from parseOutput (tau RPC can emit many).
|
// Collect one message per assistant text from parseOutput (tau RPC can emit many).
|
||||||
const parsedTexts =
|
const parsedTexts =
|
||||||
parsed?.texts?.map((t) => t.trim()).filter(Boolean) ??
|
parsed?.texts?.map((t) => t.trim()).filter(Boolean) ?? [];
|
||||||
(parsed?.text ? [parsed.text.trim()] : []);
|
|
||||||
|
|
||||||
type ReplyItem = { text: string; media?: string[] };
|
type ReplyItem = { text: string; media?: string[] };
|
||||||
const replyItems: ReplyItem[] = [];
|
const replyItems: ReplyItem[] = [];
|
||||||
@@ -274,7 +274,7 @@ export async function runCommandReply(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If parser gave nothing, fall back to raw stdout as a single message.
|
// If parser gave nothing, fall back to raw stdout as a single message.
|
||||||
if (replyItems.length === 0 && trimmed) {
|
if (replyItems.length === 0 && trimmed && !parserProvided) {
|
||||||
const { text: cleanedText, mediaUrls: mediaFound } =
|
const { text: cleanedText, mediaUrls: mediaFound } =
|
||||||
splitMediaFromOutput(trimmed);
|
splitMediaFromOutput(trimmed);
|
||||||
if (cleanedText || mediaFound?.length) {
|
if (cleanedText || mediaFound?.length) {
|
||||||
@@ -401,7 +401,7 @@ export async function runCommandReply(
|
|||||||
}
|
}
|
||||||
|
|
||||||
verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`);
|
verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`);
|
||||||
return { payloads, meta };
|
return { payloads, payload: payloads[0], meta };
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const elapsed = Date.now() - started;
|
const elapsed = Date.now() - started;
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|||||||
49
src/auto-reply/reply.chunking.test.ts
Normal file
49
src/auto-reply/reply.chunking.test.ts
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
import { describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
import type { WarelayConfig } from "../config/config.js";
|
||||||
|
import { autoReplyIfConfigured } from "./reply.js";
|
||||||
|
|
||||||
|
describe("autoReplyIfConfigured chunking", () => {
|
||||||
|
it("sends a single Twilio message for multi-line text under limit", async () => {
|
||||||
|
const body = [
|
||||||
|
"Oh! Hi Peter! 🦞",
|
||||||
|
"",
|
||||||
|
"Sorry, I got a bit trigger-happy with the heartbeat response there. What's up?",
|
||||||
|
"",
|
||||||
|
"Everything working on your end?",
|
||||||
|
].join("\n");
|
||||||
|
|
||||||
|
const config: WarelayConfig = {
|
||||||
|
inbound: {
|
||||||
|
reply: {
|
||||||
|
mode: "text",
|
||||||
|
text: body,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const create = vi.fn().mockResolvedValue({});
|
||||||
|
const client = { messages: { create } } as unknown as Parameters<
|
||||||
|
typeof autoReplyIfConfigured
|
||||||
|
>[0];
|
||||||
|
|
||||||
|
const message = {
|
||||||
|
body: "ping",
|
||||||
|
from: "+15551234567",
|
||||||
|
to: "+15557654321",
|
||||||
|
sid: "SM123",
|
||||||
|
} as Parameters<typeof autoReplyIfConfigured>[1];
|
||||||
|
|
||||||
|
await autoReplyIfConfigured(client, message, config);
|
||||||
|
|
||||||
|
expect(create).toHaveBeenCalledTimes(1);
|
||||||
|
expect(create).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
body,
|
||||||
|
from: message.to,
|
||||||
|
to: message.from,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
@@ -1,5 +1,4 @@
|
|||||||
import crypto from "node:crypto";
|
import crypto from "node:crypto";
|
||||||
|
|
||||||
import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js";
|
import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js";
|
||||||
import { loadConfig, type WarelayConfig } from "../config/config.js";
|
import { loadConfig, type WarelayConfig } from "../config/config.js";
|
||||||
import {
|
import {
|
||||||
@@ -18,6 +17,7 @@ import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
|||||||
import type { TwilioRequester } from "../twilio/types.js";
|
import type { TwilioRequester } from "../twilio/types.js";
|
||||||
import { sendTypingIndicator } from "../twilio/typing.js";
|
import { sendTypingIndicator } from "../twilio/typing.js";
|
||||||
import { runCommandReply } from "./command-reply.js";
|
import { runCommandReply } from "./command-reply.js";
|
||||||
|
import { chunkText } from "./chunk.js";
|
||||||
import {
|
import {
|
||||||
applyTemplate,
|
applyTemplate,
|
||||||
type MsgContext,
|
type MsgContext,
|
||||||
@@ -307,7 +307,7 @@ export async function getReplyFromConfig(
|
|||||||
mediaUrl: reply.mediaUrl,
|
mediaUrl: reply.mediaUrl,
|
||||||
};
|
};
|
||||||
cleanupTyping();
|
cleanupTyping();
|
||||||
return [result];
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reply && reply.mode === "command" && reply.command?.length) {
|
if (reply && reply.mode === "command" && reply.command?.length) {
|
||||||
@@ -318,7 +318,7 @@ export async function getReplyFromConfig(
|
|||||||
mode: "command" as const,
|
mode: "command" as const,
|
||||||
};
|
};
|
||||||
try {
|
try {
|
||||||
const { payloads, meta } = await runCommandReply({
|
const runResult = await runCommandReply({
|
||||||
reply: commandReply,
|
reply: commandReply,
|
||||||
templatingCtx,
|
templatingCtx,
|
||||||
sendSystemOnce,
|
sendSystemOnce,
|
||||||
@@ -329,6 +329,17 @@ export async function getReplyFromConfig(
|
|||||||
timeoutSeconds,
|
timeoutSeconds,
|
||||||
commandRunner,
|
commandRunner,
|
||||||
});
|
});
|
||||||
|
const payloadArray =
|
||||||
|
runResult.payloads ?? (runResult.payload ? [runResult.payload] : []);
|
||||||
|
const meta = runResult.meta;
|
||||||
|
const normalizedPayloads =
|
||||||
|
payloadArray.length === 1 ? payloadArray[0] : payloadArray;
|
||||||
|
if (
|
||||||
|
!normalizedPayloads ||
|
||||||
|
(Array.isArray(normalizedPayloads) && normalizedPayloads.length === 0)
|
||||||
|
) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
if (sessionCfg && sessionStore && sessionKey) {
|
if (sessionCfg && sessionStore && sessionKey) {
|
||||||
const returnedSessionId = meta.agentMeta?.sessionId;
|
const returnedSessionId = meta.agentMeta?.sessionId;
|
||||||
if (returnedSessionId && returnedSessionId !== sessionId) {
|
if (returnedSessionId && returnedSessionId !== sessionId) {
|
||||||
@@ -357,7 +368,7 @@ export async function getReplyFromConfig(
|
|||||||
if (meta.agentMeta && isVerbose()) {
|
if (meta.agentMeta && isVerbose()) {
|
||||||
logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`);
|
logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`);
|
||||||
}
|
}
|
||||||
return payloads;
|
return normalizedPayloads;
|
||||||
} finally {
|
} finally {
|
||||||
cleanupTyping();
|
cleanupTyping();
|
||||||
}
|
}
|
||||||
@@ -459,10 +470,8 @@ export async function autoReplyIfConfigured(
|
|||||||
: [];
|
: [];
|
||||||
|
|
||||||
const text = replyPayload.text ?? "";
|
const text = replyPayload.text ?? "";
|
||||||
const chunks =
|
const chunks = chunkText(text, TWILIO_TEXT_LIMIT);
|
||||||
text.length > 0
|
if (chunks.length === 0) chunks.push("");
|
||||||
? (text.match(new RegExp(`.{1,${TWILIO_TEXT_LIMIT}}`, "g")) ?? [])
|
|
||||||
: [""];
|
|
||||||
|
|
||||||
for (let i = 0; i < chunks.length; i++) {
|
for (let i = 0; i < chunks.length; i++) {
|
||||||
const body = chunks[i];
|
const body = chunks[i];
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
|
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
|
||||||
import readline from "node:readline";
|
import readline from "node:readline";
|
||||||
|
|
||||||
|
import { piSpec } from "../agents/pi.js";
|
||||||
|
|
||||||
type TauRpcOptions = {
|
type TauRpcOptions = {
|
||||||
argv: string[];
|
argv: string[];
|
||||||
cwd?: string;
|
cwd?: string;
|
||||||
@@ -20,6 +22,9 @@ class TauRpcClient {
|
|||||||
private rl: readline.Interface | null = null;
|
private rl: readline.Interface | null = null;
|
||||||
private stderr = "";
|
private stderr = "";
|
||||||
private buffer: string[] = [];
|
private buffer: string[] = [];
|
||||||
|
private idleTimer: NodeJS.Timeout | null = null;
|
||||||
|
private seenAssistantEnd = false;
|
||||||
|
private readonly idleMs = 120;
|
||||||
private pending:
|
private pending:
|
||||||
| {
|
| {
|
||||||
resolve: (r: TauRpcResult) => void;
|
resolve: (r: TauRpcResult) => void;
|
||||||
@@ -59,17 +64,34 @@ class TauRpcClient {
|
|||||||
private handleLine(line: string) {
|
private handleLine(line: string) {
|
||||||
if (!this.pending) return;
|
if (!this.pending) return;
|
||||||
this.buffer.push(line);
|
this.buffer.push(line);
|
||||||
// Finish on assistant message_end event to mirror parse logic in piSpec
|
// Streamed JSON arrives line-by-line; mark when an assistant message finishes
|
||||||
|
// and resolve after a short idle to capture any follow-up events (e.g. tools)
|
||||||
|
// that belong to the same turn.
|
||||||
if (
|
if (
|
||||||
line.includes('"type":"message_end"') &&
|
line.includes('"type":"message_end"') &&
|
||||||
line.includes('"role":"assistant"')
|
line.includes('"role":"assistant"')
|
||||||
) {
|
) {
|
||||||
const out = this.buffer.join("\n");
|
this.seenAssistantEnd = true;
|
||||||
clearTimeout(this.pending.timer);
|
}
|
||||||
const pending = this.pending;
|
|
||||||
this.pending = undefined;
|
if (this.seenAssistantEnd) {
|
||||||
this.buffer = [];
|
if (this.idleTimer) clearTimeout(this.idleTimer);
|
||||||
pending.resolve({ stdout: out, stderr: this.stderr, code: 0 });
|
this.idleTimer = setTimeout(() => {
|
||||||
|
if (!this.pending) return;
|
||||||
|
const out = this.buffer.join("\n");
|
||||||
|
// Only resolve once we have at least one assistant text payload; otherwise keep waiting.
|
||||||
|
const parsed = piSpec.parseOutput(out);
|
||||||
|
if (parsed.texts && parsed.texts.length > 0) {
|
||||||
|
const pending = this.pending;
|
||||||
|
this.pending = undefined;
|
||||||
|
this.buffer = [];
|
||||||
|
this.seenAssistantEnd = false;
|
||||||
|
clearTimeout(pending.timer);
|
||||||
|
pending.resolve({ stdout: out, stderr: this.stderr, code: 0 });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// No assistant text yet; wait for more lines.
|
||||||
|
}, this.idleMs); // small idle window to group streaming blocks
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import { chunkText } from "../auto-reply/chunk.js";
|
||||||
import { getReplyFromConfig } from "../auto-reply/reply.js";
|
import { getReplyFromConfig } from "../auto-reply/reply.js";
|
||||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||||
import { waitForever } from "../cli/wait.js";
|
import { waitForever } from "../cli/wait.js";
|
||||||
@@ -373,12 +374,7 @@ async function deliverWebReply(params: {
|
|||||||
skipLog,
|
skipLog,
|
||||||
} = params;
|
} = params;
|
||||||
const replyStarted = Date.now();
|
const replyStarted = Date.now();
|
||||||
const textChunks =
|
const textChunks = chunkText(replyResult.text || "", WEB_TEXT_LIMIT);
|
||||||
(replyResult.text || "").length > 0
|
|
||||||
? ((replyResult.text || "").match(
|
|
||||||
new RegExp(`.{1,${WEB_TEXT_LIMIT}}`, "g"),
|
|
||||||
) ?? [])
|
|
||||||
: [];
|
|
||||||
const mediaList = replyResult.mediaUrls?.length
|
const mediaList = replyResult.mediaUrls?.length
|
||||||
? replyResult.mediaUrls
|
? replyResult.mediaUrls
|
||||||
: replyResult.mediaUrl
|
: replyResult.mediaUrl
|
||||||
@@ -417,6 +413,8 @@ async function deliverWebReply(params: {
|
|||||||
|
|
||||||
// Media (with optional caption on first item)
|
// Media (with optional caption on first item)
|
||||||
for (const [index, mediaUrl] of mediaList.entries()) {
|
for (const [index, mediaUrl] of mediaList.entries()) {
|
||||||
|
const caption =
|
||||||
|
index === 0 ? remainingText.shift() || undefined : undefined;
|
||||||
try {
|
try {
|
||||||
const media = await loadWebMedia(mediaUrl, maxMediaBytes);
|
const media = await loadWebMedia(mediaUrl, maxMediaBytes);
|
||||||
if (isVerbose()) {
|
if (isVerbose()) {
|
||||||
@@ -427,8 +425,6 @@ async function deliverWebReply(params: {
|
|||||||
`Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`,
|
`Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
const caption =
|
|
||||||
index === 0 ? remainingText.shift() || undefined : undefined;
|
|
||||||
if (media.kind === "image") {
|
if (media.kind === "image") {
|
||||||
await msg.sendMedia({
|
await msg.sendMedia({
|
||||||
image: media.buffer,
|
image: media.buffer,
|
||||||
@@ -481,9 +477,12 @@ async function deliverWebReply(params: {
|
|||||||
danger(`Failed sending web media to ${msg.from}: ${String(err)}`),
|
danger(`Failed sending web media to ${msg.from}: ${String(err)}`),
|
||||||
);
|
);
|
||||||
replyLogger.warn({ err, mediaUrl }, "failed to send web media reply");
|
replyLogger.warn({ err, mediaUrl }, "failed to send web media reply");
|
||||||
if (index === 0 && remainingText.length) {
|
if (index === 0) {
|
||||||
console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`);
|
const fallbackText = remainingText.shift() ?? caption ?? "";
|
||||||
await msg.reply(remainingText.shift() || "");
|
if (fallbackText) {
|
||||||
|
console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`);
|
||||||
|
await msg.reply(fallbackText);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user