fix: bundle pi dependency and directive handling

This commit is contained in:
Peter Steinberger
2025-12-06 00:49:46 +01:00
parent 6f27f742fe
commit ddfb76e9e0
8 changed files with 127 additions and 115 deletions

View File

@@ -35,7 +35,8 @@
"packageManager": "pnpm@10.23.0",
"dependencies": {
"@whiskeysockets/baileys": "7.0.0-rc.9",
"@mariozechner/pi-coding-agent": "^0.12.4",
"@mariozechner/pi-ai": "^0.12.11",
"@mariozechner/pi-coding-agent": "^0.12.11",
"body-parser": "^2.2.1",
"chalk": "^5.6.2",
"commander": "^14.0.2",

View File

@@ -131,7 +131,11 @@ describe("runCommandReply (pi)", () => {
command: ["pi", "{{Body}}"],
agent: { kind: "pi" },
},
templatingCtx: { ...noopTemplateCtx, Body: "hello", BodyStripped: "hello" },
templatingCtx: {
...noopTemplateCtx,
Body: "hello",
BodyStripped: "hello",
},
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,

View File

@@ -51,7 +51,8 @@ function stripRpcNoise(raw: string): string {
// Keep only assistant/tool messages; drop agent_start/turn_start/user/etc.
const isAssistant = role === "assistant";
const isToolRole = typeof role === "string" && role.toLowerCase().includes("tool");
const isToolRole =
typeof role === "string" && role.toLowerCase().includes("tool");
if (!isAssistant && !isToolRole) continue;
// Ignore assistant messages that have no text content (pure toolcall scaffolding).
@@ -77,8 +78,15 @@ function extractRpcAssistantText(raw: string): string | undefined {
try {
const evt = JSON.parse(line) as {
type?: string;
message?: { role?: string; content?: Array<{ type?: string; text?: string }> };
assistantMessageEvent?: { type?: string; delta?: string; content?: string };
message?: {
role?: string;
content?: Array<{ type?: string; text?: string }>;
};
assistantMessageEvent?: {
type?: string;
delta?: string;
content?: string;
};
};
if (
evt.type === "message_end" &&
@@ -329,7 +337,7 @@ export async function runCommandReply(
systemSent,
timeoutMs,
timeoutSeconds,
commandRunner,
commandRunner: _commandRunner,
enqueue = enqueueCommand,
thinkLevel,
verboseLevel,
@@ -466,9 +474,7 @@ export async function runCommandReply(
});
// Drive pi via RPC stdin so auto-compaction and streaming run server-side.
let rpcInput: string | undefined;
let rpcArgv = finalArgv;
rpcInput = `${JSON.stringify({ type: "prompt", message: promptArg })}\n`;
const bodyIdx =
promptIndex >= 0 ? promptIndex : Math.max(finalArgv.length - 1, 0);
rpcArgv = finalArgv.filter((_, idx) => idx !== bodyIdx);
@@ -522,7 +528,10 @@ export async function runCommandReply(
}
};
let lastStreamedAssistant: string | undefined;
const streamAssistantFinal = (msg?: { role?: string; content?: unknown[] }) => {
const streamAssistantFinal = (msg?: {
role?: string;
content?: unknown[];
}) => {
if (!onPartialReply || msg?.role !== "assistant") return;
const textBlocks = Array.isArray(msg.content)
? (msg.content as Array<{ type?: string; text?: string }>)
@@ -677,18 +686,18 @@ export async function runCommandReply(
}
},
});
const rawStdout = stdout.trim();
const rpcAssistantText = extractRpcAssistantText(stdout);
let mediaFromCommand: string[] | undefined;
const trimmed = stripRpcNoise(rawStdout);
if (stderr?.trim()) {
logVerbose(`Command auto-reply stderr: ${stderr.trim()}`);
}
const rawStdout = stdout.trim();
const rpcAssistantText = extractRpcAssistantText(stdout);
let mediaFromCommand: string[] | undefined;
const trimmed = stripRpcNoise(rawStdout);
if (stderr?.trim()) {
logVerbose(`Command auto-reply stderr: ${stderr.trim()}`);
}
const logFailure = () => {
const truncate = (s?: string) =>
s ? (s.length > 4000 ? `${s.slice(0, 4000)}` : s) : undefined;
logger.warn(
const logFailure = () => {
const truncate = (s?: string) =>
s ? (s.length > 4000 ? `${s.slice(0, 4000)}` : s) : undefined;
logger.warn(
{
code,
signal,
@@ -779,23 +788,25 @@ export async function runCommandReply(
}
// If parser gave nothing, fall back to best-effort assistant text (prefers RPC deltas).
const fallbackText =
rpcAssistantText ??
extractRpcAssistantText(trimmed) ??
extractAssistantTextLoosely(trimmed) ??
trimmed;
const normalize = (s?: string) =>
stripStructuralPrefixes((s ?? "").trim()).toLowerCase();
const bodyNorm = normalize(templatingCtx.Body ?? templatingCtx.BodyStripped);
const fallbackNorm = normalize(fallbackText);
const promptEcho =
fallbackText &&
(fallbackText === (templatingCtx.Body ?? "") ||
fallbackText === (templatingCtx.BodyStripped ?? "") ||
(bodyNorm.length > 0 && bodyNorm === fallbackNorm));
const safeFallbackText = promptEcho ? undefined : fallbackText;
const fallbackText =
rpcAssistantText ??
extractRpcAssistantText(trimmed) ??
extractAssistantTextLoosely(trimmed) ??
trimmed;
const normalize = (s?: string) =>
stripStructuralPrefixes((s ?? "").trim()).toLowerCase();
const bodyNorm = normalize(
templatingCtx.Body ?? templatingCtx.BodyStripped,
);
const fallbackNorm = normalize(fallbackText);
const promptEcho =
fallbackText &&
(fallbackText === (templatingCtx.Body ?? "") ||
fallbackText === (templatingCtx.BodyStripped ?? "") ||
(bodyNorm.length > 0 && bodyNorm === fallbackNorm));
const safeFallbackText = promptEcho ? undefined : fallbackText;
if (replyItems.length === 0 && safeFallbackText && !hasParsedContent) {
if (replyItems.length === 0 && safeFallbackText && !hasParsedContent) {
const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(safeFallbackText);
if (cleanedText || mediaFound?.length) {

View File

@@ -1,6 +1,10 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import * as tauRpc from "../process/tau-rpc.js";
import { getReplyFromConfig, extractVerboseDirective, extractThinkDirective } from "./reply.js";
import {
extractThinkDirective,
extractVerboseDirective,
getReplyFromConfig,
} from "./reply.js";
describe("directive parsing", () => {
afterEach(() => {

View File

@@ -37,6 +37,8 @@ const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]);
const ABORT_MEMORY = new Map<string, boolean>();
const SYSTEM_MARK = "⚙️";
type ReplyConfig = NonNullable<WarelayConfig["inbound"]>["reply"];
export function extractThinkDirective(body?: string): {
cleaned: string;
thinkLevel?: ThinkLevel;
@@ -44,8 +46,7 @@ export function extractThinkDirective(body?: string): {
hasDirective: boolean;
} {
if (!body) return { cleaned: "", hasDirective: false };
// Match the longest keyword first to avoid partial captures (e.g. "/think:high").
// Require start of string or whitespace before "/" to avoid catching URLs.
// Match the longest keyword first to avoid partial captures (e.g. "/think:high")
const match = body.match(
/(?:^|\s)\/(?:thinking|think|t)\s*:?\s*([a-zA-Z-]+)\b/i,
);
@@ -68,8 +69,9 @@ export function extractVerboseDirective(body?: string): {
hasDirective: boolean;
} {
if (!body) return { cleaned: "", hasDirective: false };
// Require start or whitespace before "/verbose" and reject "/ver*" typos.
const match = body.match(/(?:^|\s)\/v(?:erbose)?\b\s*:?\s*([a-zA-Z-]+)\b/i);
const match = body.match(
/(?:^|\s)\/(?:verbose|v)(?=$|\s|:)\s*:?\s*([a-zA-Z-]+)\b/i,
);
const verboseLevel = normalizeVerboseLevel(match?.[1]);
const cleaned = match
? body.replace(match[0], "").replace(/\s+/g, " ").trim()
@@ -129,7 +131,7 @@ function stripMentions(
return result.replace(/\s+/g, " ").trim();
}
function makeDefaultPiReply() {
function makeDefaultPiReply(): ReplyConfig {
const piBin = resolveBundledPiBinary() ?? "pi";
const defaultContext =
lookupContextTokens(DEFAULT_MODEL) ?? DEFAULT_CONTEXT_TOKENS;
@@ -159,7 +161,7 @@ export async function getReplyFromConfig(
): Promise<ReplyPayload | ReplyPayload[] | undefined> {
// Choose reply from config: static text or external command stdout.
const cfg = configOverride ?? loadConfig();
const reply = cfg.inbound?.reply ?? makeDefaultPiReply();
const reply: ReplyConfig = cfg.inbound?.reply ?? makeDefaultPiReply();
const timeoutSeconds = Math.max(reply?.timeoutSeconds ?? 600, 1);
const timeoutMs = timeoutSeconds * 1000;
let started = false;
@@ -216,7 +218,7 @@ export async function getReplyFromConfig(
1,
);
const sessionScope = sessionCfg?.scope ?? "per-sender";
const storePath = sessionCfg ? resolveStorePath(sessionCfg.store) : undefined;
const storePath = resolveStorePath(sessionCfg?.store);
let sessionStore: ReturnType<typeof loadSessionStore> | undefined;
let sessionKey: string | undefined;
let sessionEntry: SessionEntry | undefined;
@@ -230,9 +232,7 @@ export async function getReplyFromConfig(
let persistedThinking: string | undefined;
let persistedVerbose: string | undefined;
const triggerBodyNormalized = stripStructuralPrefixes(
ctx.Body ?? "",
)
const triggerBodyNormalized = stripStructuralPrefixes(ctx.Body ?? "")
.trim()
.toLowerCase();
@@ -299,38 +299,24 @@ export async function getReplyFromConfig(
IsNewSession: isNewSession ? "true" : "false",
};
const directiveSource = stripStructuralPrefixes(
sessionCtx.BodyStripped ?? sessionCtx.Body ?? "",
);
const {
cleaned: thinkCleanedDirective,
cleaned: thinkCleaned,
thinkLevel: inlineThink,
rawLevel: rawThinkLevel,
hasDirective: hasThinkDirective,
} = extractThinkDirective(directiveSource);
} = extractThinkDirective(sessionCtx.BodyStripped ?? sessionCtx.Body ?? "");
const {
cleaned: verboseCleanedDirective,
cleaned: verboseCleaned,
verboseLevel: inlineVerbose,
rawLevel: rawVerboseLevel,
hasDirective: hasVerboseDirective,
} = extractVerboseDirective(thinkCleanedDirective);
// Keep the full body (including context wrapper) for the agent, but strip
// directives from it separately so history remains intact.
const { cleaned: thinkCleanedFull } = extractThinkDirective(
sessionCtx.Body ?? "",
);
const { cleaned: verboseCleanedFull } = extractVerboseDirective(
thinkCleanedFull,
);
sessionCtx.Body = verboseCleanedFull;
sessionCtx.BodyStripped = verboseCleanedFull;
} = extractVerboseDirective(thinkCleaned);
sessionCtx.Body = verboseCleaned;
sessionCtx.BodyStripped = verboseCleaned;
const isGroup =
typeof ctx.From === "string" &&
(ctx.From.includes("@g.us") || ctx.From.startsWith("group:"));
const isHeartbeat = opts?.isHeartbeat === true;
let resolvedThinkLevel =
inlineThink ??
@@ -346,26 +332,26 @@ export async function getReplyFromConfig(
hasThinkDirective &&
hasVerboseDirective &&
(() => {
const stripped = stripStructuralPrefixes(verboseCleanedDirective ?? "");
const stripped = stripStructuralPrefixes(verboseCleaned ?? "");
const noMentions = isGroup ? stripMentions(stripped, ctx, cfg) : stripped;
return noMentions.length === 0;
})();
const directiveOnly = (() => {
if (!hasThinkDirective) return false;
if (!thinkCleanedDirective) return true;
if (!thinkCleaned) return true;
// Check after stripping both think and verbose so combined directives count.
const stripped = stripStructuralPrefixes(verboseCleanedDirective);
const stripped = stripStructuralPrefixes(verboseCleaned);
const noMentions = isGroup ? stripMentions(stripped, ctx, cfg) : stripped;
return noMentions.length === 0;
})();
// Directive-only message => persist session thinking level and return ack
if (!isHeartbeat && (directiveOnly || combinedDirectiveOnly)) {
if (directiveOnly || combinedDirectiveOnly) {
if (!inlineThink) {
cleanupTyping();
return {
text: `${SYSTEM_MARK} Unrecognized thinking level "${rawThinkLevel ?? ""}". Valid levels: off, minimal, low, medium, high.`,
text: `Unrecognized thinking level "${rawThinkLevel ?? ""}". Valid levels: off, minimal, low, medium, high.`,
};
}
if (sessionEntry && sessionStore && sessionKey) {
@@ -414,24 +400,24 @@ export async function getReplyFromConfig(
);
}
}
const ack = `${SYSTEM_MARK} ${parts.join(" ")}`;
const ack = parts.join(" ");
cleanupTyping();
return { text: ack };
}
const verboseDirectiveOnly = (() => {
if (!hasVerboseDirective) return false;
if (!verboseCleanedDirective) return true;
const stripped = stripStructuralPrefixes(verboseCleanedDirective);
if (!verboseCleaned) return true;
const stripped = stripStructuralPrefixes(verboseCleaned);
const noMentions = isGroup ? stripMentions(stripped, ctx, cfg) : stripped;
return noMentions.length === 0;
})();
if (!isHeartbeat && verboseDirectiveOnly) {
if (verboseDirectiveOnly) {
if (!inlineVerbose) {
cleanupTyping();
return {
text: `${SYSTEM_MARK} Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`,
text: `Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`,
};
}
if (sessionEntry && sessionStore && sessionKey) {
@@ -452,29 +438,29 @@ export async function getReplyFromConfig(
return { text: ack };
}
// If directives are inline with other text: persist levels, then continue to agent (no early ack).
if (hasThinkDirective || hasVerboseDirective) {
if (sessionEntry && sessionStore && sessionKey) {
if (hasThinkDirective && inlineThink) {
if (inlineThink === "off") {
delete sessionEntry.thinkingLevel;
} else {
sessionEntry.thinkingLevel = inlineThink;
}
sessionEntry.updatedAt = Date.now();
// Persist inline think/verbose settings even when additional content follows.
if (sessionEntry && sessionStore && sessionKey) {
let updated = false;
if (hasThinkDirective && inlineThink) {
if (inlineThink === "off") {
delete sessionEntry.thinkingLevel;
} else {
sessionEntry.thinkingLevel = inlineThink;
}
if (hasVerboseDirective && inlineVerbose) {
if (inlineVerbose === "off") {
delete sessionEntry.verboseLevel;
} else {
sessionEntry.verboseLevel = inlineVerbose;
}
sessionEntry.updatedAt = Date.now();
}
if (sessionEntry.updatedAt) {
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
updated = true;
}
if (hasVerboseDirective && inlineVerbose) {
if (inlineVerbose === "off") {
delete sessionEntry.verboseLevel;
} else {
sessionEntry.verboseLevel = inlineVerbose;
}
updated = true;
}
if (updated) {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
}
}
@@ -539,7 +525,7 @@ export async function getReplyFromConfig(
const isFirstTurnInSession = isNewSession || !systemSent;
const sessionIntro =
isFirstTurnInSession && sessionCfg?.sessionIntro
? applyTemplate(sessionCfg.sessionIntro, sessionCtx)
? applyTemplate(sessionCfg.sessionIntro ?? "", sessionCtx)
: "";
const groupIntro =
isFirstTurnInSession && sessionCtx.ChatType === "group"
@@ -561,7 +547,7 @@ export async function getReplyFromConfig(
})()
: "";
const bodyPrefix = reply?.bodyPrefix
? applyTemplate(reply.bodyPrefix, sessionCtx)
? applyTemplate(reply.bodyPrefix ?? "", sessionCtx)
: "";
const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? "";
const abortedHint =
@@ -659,13 +645,15 @@ export async function getReplyFromConfig(
await onReplyStart();
logVerbose("Using text auto-reply from config");
const result = {
text: applyTemplate(reply.text, templatingCtx),
text: applyTemplate(reply.text ?? "", templatingCtx),
mediaUrl: reply.mediaUrl,
};
cleanupTyping();
return result;
}
const isHeartbeat = opts?.isHeartbeat === true;
if (reply && reply.mode === "command") {
const heartbeatCommand = isHeartbeat
? (reply as { heartbeatCommand?: string[] }).heartbeatCommand

View File

@@ -21,7 +21,8 @@ export type TemplateContext = MsgContext & {
};
// Simple {{Placeholder}} interpolation using inbound message context.
export function applyTemplate(str: string, ctx: TemplateContext) {
export function applyTemplate(str: string | undefined, ctx: TemplateContext) {
if (!str) return "";
return str.replace(/{{\s*(\w+)\s*}}/g, (_, key) => {
const value = (ctx as Record<string, unknown>)[key];
return value == null ? "" : String(value);

View File

@@ -83,20 +83,19 @@ export async function sessionsCommand(
);
const store = loadSessionStore(storePath);
const activeMinutes = opts.active
? Number.parseInt(String(opts.active), 10)
: undefined;
if (
opts.active !== undefined &&
(Number.isNaN(activeMinutes) || activeMinutes <= 0)
) {
runtime.error("--active must be a positive integer (minutes)");
runtime.exit(1);
return;
let activeMinutes: number | undefined;
if (opts.active !== undefined) {
const parsed = Number.parseInt(String(opts.active), 10);
if (Number.isNaN(parsed) || parsed <= 0) {
runtime.error("--active must be a positive integer (minutes)");
runtime.exit(1);
return;
}
activeMinutes = parsed;
}
const rows = toRows(store).filter((row) => {
if (!activeMinutes) return true;
if (activeMinutes === undefined) return true;
if (!row.updatedAt) return false;
return Date.now() - row.updatedAt <= activeMinutes * 60_000;
});

View File

@@ -102,7 +102,9 @@ class TauRpcClient {
this.pending.timer = setTimeout(() => {
const pending = this.pending;
this.pending = undefined;
pending?.reject(new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`));
pending?.reject(
new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`),
);
this.child?.kill("SIGKILL");
}, capMs);
}
@@ -133,7 +135,9 @@ class TauRpcClient {
const capMs = Math.min(timeoutMs, 5 * 60 * 1000);
const timer = setTimeout(() => {
this.pending = undefined;
reject(new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`));
reject(
new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`),
);
child.kill("SIGKILL");
}, capMs);
this.pending = { resolve, reject, timer, onEvent, capMs };