From ddfb76e9e0725c8fda4f4c0ea8aecde54762ac78 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 6 Dec 2025 00:49:46 +0100 Subject: [PATCH] fix: bundle pi dependency and directive handling --- package.json | 3 +- src/auto-reply/command-reply.test.ts | 6 +- src/auto-reply/command-reply.ts | 79 +++++++++-------- src/auto-reply/reply.directive.test.ts | 6 +- src/auto-reply/reply.ts | 116 +++++++++++-------------- src/auto-reply/templating.ts | 3 +- src/commands/sessions.ts | 21 +++-- src/process/tau-rpc.ts | 8 +- 8 files changed, 127 insertions(+), 115 deletions(-) diff --git a/package.json b/package.json index adc79dc84..193ed8eb6 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,8 @@ "packageManager": "pnpm@10.23.0", "dependencies": { "@whiskeysockets/baileys": "7.0.0-rc.9", - "@mariozechner/pi-coding-agent": "^0.12.4", + "@mariozechner/pi-ai": "^0.12.11", + "@mariozechner/pi-coding-agent": "^0.12.11", "body-parser": "^2.2.1", "chalk": "^5.6.2", "commander": "^14.0.2", diff --git a/src/auto-reply/command-reply.test.ts b/src/auto-reply/command-reply.test.ts index bc7c49868..7c7da2c25 100644 --- a/src/auto-reply/command-reply.test.ts +++ b/src/auto-reply/command-reply.test.ts @@ -131,7 +131,11 @@ describe("runCommandReply (pi)", () => { command: ["pi", "{{Body}}"], agent: { kind: "pi" }, }, - templatingCtx: { ...noopTemplateCtx, Body: "hello", BodyStripped: "hello" }, + templatingCtx: { + ...noopTemplateCtx, + Body: "hello", + BodyStripped: "hello", + }, sendSystemOnce: false, isNewSession: true, isFirstTurnInSession: true, diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 892ebcb51..d67eae7e8 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -51,7 +51,8 @@ function stripRpcNoise(raw: string): string { // Keep only assistant/tool messages; drop agent_start/turn_start/user/etc. const isAssistant = role === "assistant"; - const isToolRole = typeof role === "string" && role.toLowerCase().includes("tool"); + const isToolRole = + typeof role === "string" && role.toLowerCase().includes("tool"); if (!isAssistant && !isToolRole) continue; // Ignore assistant messages that have no text content (pure toolcall scaffolding). @@ -77,8 +78,15 @@ function extractRpcAssistantText(raw: string): string | undefined { try { const evt = JSON.parse(line) as { type?: string; - message?: { role?: string; content?: Array<{ type?: string; text?: string }> }; - assistantMessageEvent?: { type?: string; delta?: string; content?: string }; + message?: { + role?: string; + content?: Array<{ type?: string; text?: string }>; + }; + assistantMessageEvent?: { + type?: string; + delta?: string; + content?: string; + }; }; if ( evt.type === "message_end" && @@ -329,7 +337,7 @@ export async function runCommandReply( systemSent, timeoutMs, timeoutSeconds, - commandRunner, + commandRunner: _commandRunner, enqueue = enqueueCommand, thinkLevel, verboseLevel, @@ -466,9 +474,7 @@ export async function runCommandReply( }); // Drive pi via RPC stdin so auto-compaction and streaming run server-side. - let rpcInput: string | undefined; let rpcArgv = finalArgv; - rpcInput = `${JSON.stringify({ type: "prompt", message: promptArg })}\n`; const bodyIdx = promptIndex >= 0 ? promptIndex : Math.max(finalArgv.length - 1, 0); rpcArgv = finalArgv.filter((_, idx) => idx !== bodyIdx); @@ -522,7 +528,10 @@ export async function runCommandReply( } }; let lastStreamedAssistant: string | undefined; - const streamAssistantFinal = (msg?: { role?: string; content?: unknown[] }) => { + const streamAssistantFinal = (msg?: { + role?: string; + content?: unknown[]; + }) => { if (!onPartialReply || msg?.role !== "assistant") return; const textBlocks = Array.isArray(msg.content) ? (msg.content as Array<{ type?: string; text?: string }>) @@ -677,18 +686,18 @@ export async function runCommandReply( } }, }); - const rawStdout = stdout.trim(); - const rpcAssistantText = extractRpcAssistantText(stdout); - let mediaFromCommand: string[] | undefined; - const trimmed = stripRpcNoise(rawStdout); - if (stderr?.trim()) { - logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); - } + const rawStdout = stdout.trim(); + const rpcAssistantText = extractRpcAssistantText(stdout); + let mediaFromCommand: string[] | undefined; + const trimmed = stripRpcNoise(rawStdout); + if (stderr?.trim()) { + logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); + } - const logFailure = () => { - const truncate = (s?: string) => - s ? (s.length > 4000 ? `${s.slice(0, 4000)}…` : s) : undefined; - logger.warn( + const logFailure = () => { + const truncate = (s?: string) => + s ? (s.length > 4000 ? `${s.slice(0, 4000)}…` : s) : undefined; + logger.warn( { code, signal, @@ -779,23 +788,25 @@ export async function runCommandReply( } // If parser gave nothing, fall back to best-effort assistant text (prefers RPC deltas). - const fallbackText = - rpcAssistantText ?? - extractRpcAssistantText(trimmed) ?? - extractAssistantTextLoosely(trimmed) ?? - trimmed; - const normalize = (s?: string) => - stripStructuralPrefixes((s ?? "").trim()).toLowerCase(); - const bodyNorm = normalize(templatingCtx.Body ?? templatingCtx.BodyStripped); - const fallbackNorm = normalize(fallbackText); - const promptEcho = - fallbackText && - (fallbackText === (templatingCtx.Body ?? "") || - fallbackText === (templatingCtx.BodyStripped ?? "") || - (bodyNorm.length > 0 && bodyNorm === fallbackNorm)); - const safeFallbackText = promptEcho ? undefined : fallbackText; + const fallbackText = + rpcAssistantText ?? + extractRpcAssistantText(trimmed) ?? + extractAssistantTextLoosely(trimmed) ?? + trimmed; + const normalize = (s?: string) => + stripStructuralPrefixes((s ?? "").trim()).toLowerCase(); + const bodyNorm = normalize( + templatingCtx.Body ?? templatingCtx.BodyStripped, + ); + const fallbackNorm = normalize(fallbackText); + const promptEcho = + fallbackText && + (fallbackText === (templatingCtx.Body ?? "") || + fallbackText === (templatingCtx.BodyStripped ?? "") || + (bodyNorm.length > 0 && bodyNorm === fallbackNorm)); + const safeFallbackText = promptEcho ? undefined : fallbackText; - if (replyItems.length === 0 && safeFallbackText && !hasParsedContent) { + if (replyItems.length === 0 && safeFallbackText && !hasParsedContent) { const { text: cleanedText, mediaUrls: mediaFound } = splitMediaFromOutput(safeFallbackText); if (cleanedText || mediaFound?.length) { diff --git a/src/auto-reply/reply.directive.test.ts b/src/auto-reply/reply.directive.test.ts index 3a89a0825..d6fa26539 100644 --- a/src/auto-reply/reply.directive.test.ts +++ b/src/auto-reply/reply.directive.test.ts @@ -1,6 +1,10 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import * as tauRpc from "../process/tau-rpc.js"; -import { getReplyFromConfig, extractVerboseDirective, extractThinkDirective } from "./reply.js"; +import { + extractThinkDirective, + extractVerboseDirective, + getReplyFromConfig, +} from "./reply.js"; describe("directive parsing", () => { afterEach(() => { diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 535e74830..dd05c9e66 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -37,6 +37,8 @@ const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]); const ABORT_MEMORY = new Map(); const SYSTEM_MARK = "⚙️"; +type ReplyConfig = NonNullable["reply"]; + export function extractThinkDirective(body?: string): { cleaned: string; thinkLevel?: ThinkLevel; @@ -44,8 +46,7 @@ export function extractThinkDirective(body?: string): { hasDirective: boolean; } { if (!body) return { cleaned: "", hasDirective: false }; - // Match the longest keyword first to avoid partial captures (e.g. "/think:high"). - // Require start of string or whitespace before "/" to avoid catching URLs. + // Match the longest keyword first to avoid partial captures (e.g. "/think:high") const match = body.match( /(?:^|\s)\/(?:thinking|think|t)\s*:?\s*([a-zA-Z-]+)\b/i, ); @@ -68,8 +69,9 @@ export function extractVerboseDirective(body?: string): { hasDirective: boolean; } { if (!body) return { cleaned: "", hasDirective: false }; - // Require start or whitespace before "/verbose" and reject "/ver*" typos. - const match = body.match(/(?:^|\s)\/v(?:erbose)?\b\s*:?\s*([a-zA-Z-]+)\b/i); + const match = body.match( + /(?:^|\s)\/(?:verbose|v)(?=$|\s|:)\s*:?\s*([a-zA-Z-]+)\b/i, + ); const verboseLevel = normalizeVerboseLevel(match?.[1]); const cleaned = match ? body.replace(match[0], "").replace(/\s+/g, " ").trim() @@ -129,7 +131,7 @@ function stripMentions( return result.replace(/\s+/g, " ").trim(); } -function makeDefaultPiReply() { +function makeDefaultPiReply(): ReplyConfig { const piBin = resolveBundledPiBinary() ?? "pi"; const defaultContext = lookupContextTokens(DEFAULT_MODEL) ?? DEFAULT_CONTEXT_TOKENS; @@ -159,7 +161,7 @@ export async function getReplyFromConfig( ): Promise { // Choose reply from config: static text or external command stdout. const cfg = configOverride ?? loadConfig(); - const reply = cfg.inbound?.reply ?? makeDefaultPiReply(); + const reply: ReplyConfig = cfg.inbound?.reply ?? makeDefaultPiReply(); const timeoutSeconds = Math.max(reply?.timeoutSeconds ?? 600, 1); const timeoutMs = timeoutSeconds * 1000; let started = false; @@ -216,7 +218,7 @@ export async function getReplyFromConfig( 1, ); const sessionScope = sessionCfg?.scope ?? "per-sender"; - const storePath = sessionCfg ? resolveStorePath(sessionCfg.store) : undefined; + const storePath = resolveStorePath(sessionCfg?.store); let sessionStore: ReturnType | undefined; let sessionKey: string | undefined; let sessionEntry: SessionEntry | undefined; @@ -230,9 +232,7 @@ export async function getReplyFromConfig( let persistedThinking: string | undefined; let persistedVerbose: string | undefined; - const triggerBodyNormalized = stripStructuralPrefixes( - ctx.Body ?? "", - ) + const triggerBodyNormalized = stripStructuralPrefixes(ctx.Body ?? "") .trim() .toLowerCase(); @@ -299,38 +299,24 @@ export async function getReplyFromConfig( IsNewSession: isNewSession ? "true" : "false", }; - const directiveSource = stripStructuralPrefixes( - sessionCtx.BodyStripped ?? sessionCtx.Body ?? "", - ); const { - cleaned: thinkCleanedDirective, + cleaned: thinkCleaned, thinkLevel: inlineThink, rawLevel: rawThinkLevel, hasDirective: hasThinkDirective, - } = extractThinkDirective(directiveSource); + } = extractThinkDirective(sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""); const { - cleaned: verboseCleanedDirective, + cleaned: verboseCleaned, verboseLevel: inlineVerbose, rawLevel: rawVerboseLevel, hasDirective: hasVerboseDirective, - } = extractVerboseDirective(thinkCleanedDirective); - - // Keep the full body (including context wrapper) for the agent, but strip - // directives from it separately so history remains intact. - const { cleaned: thinkCleanedFull } = extractThinkDirective( - sessionCtx.Body ?? "", - ); - const { cleaned: verboseCleanedFull } = extractVerboseDirective( - thinkCleanedFull, - ); - - sessionCtx.Body = verboseCleanedFull; - sessionCtx.BodyStripped = verboseCleanedFull; + } = extractVerboseDirective(thinkCleaned); + sessionCtx.Body = verboseCleaned; + sessionCtx.BodyStripped = verboseCleaned; const isGroup = typeof ctx.From === "string" && (ctx.From.includes("@g.us") || ctx.From.startsWith("group:")); - const isHeartbeat = opts?.isHeartbeat === true; let resolvedThinkLevel = inlineThink ?? @@ -346,26 +332,26 @@ export async function getReplyFromConfig( hasThinkDirective && hasVerboseDirective && (() => { - const stripped = stripStructuralPrefixes(verboseCleanedDirective ?? ""); + const stripped = stripStructuralPrefixes(verboseCleaned ?? ""); const noMentions = isGroup ? stripMentions(stripped, ctx, cfg) : stripped; return noMentions.length === 0; })(); const directiveOnly = (() => { if (!hasThinkDirective) return false; - if (!thinkCleanedDirective) return true; + if (!thinkCleaned) return true; // Check after stripping both think and verbose so combined directives count. - const stripped = stripStructuralPrefixes(verboseCleanedDirective); + const stripped = stripStructuralPrefixes(verboseCleaned); const noMentions = isGroup ? stripMentions(stripped, ctx, cfg) : stripped; return noMentions.length === 0; })(); // Directive-only message => persist session thinking level and return ack - if (!isHeartbeat && (directiveOnly || combinedDirectiveOnly)) { + if (directiveOnly || combinedDirectiveOnly) { if (!inlineThink) { cleanupTyping(); return { - text: `${SYSTEM_MARK} Unrecognized thinking level "${rawThinkLevel ?? ""}". Valid levels: off, minimal, low, medium, high.`, + text: `Unrecognized thinking level "${rawThinkLevel ?? ""}". Valid levels: off, minimal, low, medium, high.`, }; } if (sessionEntry && sessionStore && sessionKey) { @@ -414,24 +400,24 @@ export async function getReplyFromConfig( ); } } - const ack = `${SYSTEM_MARK} ${parts.join(" ")}`; + const ack = parts.join(" "); cleanupTyping(); return { text: ack }; } const verboseDirectiveOnly = (() => { if (!hasVerboseDirective) return false; - if (!verboseCleanedDirective) return true; - const stripped = stripStructuralPrefixes(verboseCleanedDirective); + if (!verboseCleaned) return true; + const stripped = stripStructuralPrefixes(verboseCleaned); const noMentions = isGroup ? stripMentions(stripped, ctx, cfg) : stripped; return noMentions.length === 0; })(); - if (!isHeartbeat && verboseDirectiveOnly) { + if (verboseDirectiveOnly) { if (!inlineVerbose) { cleanupTyping(); return { - text: `${SYSTEM_MARK} Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`, + text: `Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`, }; } if (sessionEntry && sessionStore && sessionKey) { @@ -452,29 +438,29 @@ export async function getReplyFromConfig( return { text: ack }; } - // If directives are inline with other text: persist levels, then continue to agent (no early ack). - if (hasThinkDirective || hasVerboseDirective) { - if (sessionEntry && sessionStore && sessionKey) { - if (hasThinkDirective && inlineThink) { - if (inlineThink === "off") { - delete sessionEntry.thinkingLevel; - } else { - sessionEntry.thinkingLevel = inlineThink; - } - sessionEntry.updatedAt = Date.now(); + // Persist inline think/verbose settings even when additional content follows. + if (sessionEntry && sessionStore && sessionKey) { + let updated = false; + if (hasThinkDirective && inlineThink) { + if (inlineThink === "off") { + delete sessionEntry.thinkingLevel; + } else { + sessionEntry.thinkingLevel = inlineThink; } - if (hasVerboseDirective && inlineVerbose) { - if (inlineVerbose === "off") { - delete sessionEntry.verboseLevel; - } else { - sessionEntry.verboseLevel = inlineVerbose; - } - sessionEntry.updatedAt = Date.now(); - } - if (sessionEntry.updatedAt) { - sessionStore[sessionKey] = sessionEntry; - await saveSessionStore(storePath, sessionStore); + updated = true; + } + if (hasVerboseDirective && inlineVerbose) { + if (inlineVerbose === "off") { + delete sessionEntry.verboseLevel; + } else { + sessionEntry.verboseLevel = inlineVerbose; } + updated = true; + } + if (updated) { + sessionEntry.updatedAt = Date.now(); + sessionStore[sessionKey] = sessionEntry; + await saveSessionStore(storePath, sessionStore); } } @@ -539,7 +525,7 @@ export async function getReplyFromConfig( const isFirstTurnInSession = isNewSession || !systemSent; const sessionIntro = isFirstTurnInSession && sessionCfg?.sessionIntro - ? applyTemplate(sessionCfg.sessionIntro, sessionCtx) + ? applyTemplate(sessionCfg.sessionIntro ?? "", sessionCtx) : ""; const groupIntro = isFirstTurnInSession && sessionCtx.ChatType === "group" @@ -561,7 +547,7 @@ export async function getReplyFromConfig( })() : ""; const bodyPrefix = reply?.bodyPrefix - ? applyTemplate(reply.bodyPrefix, sessionCtx) + ? applyTemplate(reply.bodyPrefix ?? "", sessionCtx) : ""; const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; const abortedHint = @@ -659,13 +645,15 @@ export async function getReplyFromConfig( await onReplyStart(); logVerbose("Using text auto-reply from config"); const result = { - text: applyTemplate(reply.text, templatingCtx), + text: applyTemplate(reply.text ?? "", templatingCtx), mediaUrl: reply.mediaUrl, }; cleanupTyping(); return result; } + const isHeartbeat = opts?.isHeartbeat === true; + if (reply && reply.mode === "command") { const heartbeatCommand = isHeartbeat ? (reply as { heartbeatCommand?: string[] }).heartbeatCommand diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index 8d0b487ed..ece3add05 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -21,7 +21,8 @@ export type TemplateContext = MsgContext & { }; // Simple {{Placeholder}} interpolation using inbound message context. -export function applyTemplate(str: string, ctx: TemplateContext) { +export function applyTemplate(str: string | undefined, ctx: TemplateContext) { + if (!str) return ""; return str.replace(/{{\s*(\w+)\s*}}/g, (_, key) => { const value = (ctx as Record)[key]; return value == null ? "" : String(value); diff --git a/src/commands/sessions.ts b/src/commands/sessions.ts index 8f1569635..aa8218f28 100644 --- a/src/commands/sessions.ts +++ b/src/commands/sessions.ts @@ -83,20 +83,19 @@ export async function sessionsCommand( ); const store = loadSessionStore(storePath); - const activeMinutes = opts.active - ? Number.parseInt(String(opts.active), 10) - : undefined; - if ( - opts.active !== undefined && - (Number.isNaN(activeMinutes) || activeMinutes <= 0) - ) { - runtime.error("--active must be a positive integer (minutes)"); - runtime.exit(1); - return; + let activeMinutes: number | undefined; + if (opts.active !== undefined) { + const parsed = Number.parseInt(String(opts.active), 10); + if (Number.isNaN(parsed) || parsed <= 0) { + runtime.error("--active must be a positive integer (minutes)"); + runtime.exit(1); + return; + } + activeMinutes = parsed; } const rows = toRows(store).filter((row) => { - if (!activeMinutes) return true; + if (activeMinutes === undefined) return true; if (!row.updatedAt) return false; return Date.now() - row.updatedAt <= activeMinutes * 60_000; }); diff --git a/src/process/tau-rpc.ts b/src/process/tau-rpc.ts index ccfce7f10..0c14cdbb3 100644 --- a/src/process/tau-rpc.ts +++ b/src/process/tau-rpc.ts @@ -102,7 +102,9 @@ class TauRpcClient { this.pending.timer = setTimeout(() => { const pending = this.pending; this.pending = undefined; - pending?.reject(new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`)); + pending?.reject( + new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`), + ); this.child?.kill("SIGKILL"); }, capMs); } @@ -133,7 +135,9 @@ class TauRpcClient { const capMs = Math.min(timeoutMs, 5 * 60 * 1000); const timer = setTimeout(() => { this.pending = undefined; - reject(new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`)); + reject( + new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`), + ); child.kill("SIGKILL"); }, capMs); this.pending = { resolve, reject, timer, onEvent, capMs };