feat: track compaction count + verbose notice

This commit is contained in:
Peter Steinberger
2026-01-06 02:41:48 +01:00
parent 3c6dea3ef3
commit b30bae89ed
12 changed files with 293 additions and 6 deletions

View File

@@ -46,6 +46,7 @@
- Control UI: avoid Slack config ReferenceError by reading slack config snapshots. Thanks @sreekaransrinath for PR #249. - Control UI: avoid Slack config ReferenceError by reading slack config snapshots. Thanks @sreekaransrinath for PR #249.
- Telegram: honor routing.groupChat.mentionPatterns for group mention gating. Thanks @regenrek for PR #242. - Telegram: honor routing.groupChat.mentionPatterns for group mention gating. Thanks @regenrek for PR #242.
- Auto-reply: block unauthorized `/reset` and infer WhatsApp senders from E.164 inputs. - Auto-reply: block unauthorized `/reset` and infer WhatsApp senders from E.164 inputs.
- Auto-reply: track compaction count in session status; verbose mode announces auto-compactions.
### Maintenance ### Maintenance
- Deps: bump pi-* stack, Slack SDK, discord-api-types, file-type, zod, and Biome. - Deps: bump pi-* stack, Slack SDK, discord-api-types, file-type, zod, and Biome.

View File

@@ -555,6 +555,10 @@ export function subscribeEmbeddedPiSession(params: {
compactionInFlight = true; compactionInFlight = true;
ensureCompactionPromise(); ensureCompactionPromise();
log.debug(`embedded run compaction start: runId=${params.runId}`); log.debug(`embedded run compaction start: runId=${params.runId}`);
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "start" },
});
} }
if (evt.type === "auto_compaction_end") { if (evt.type === "auto_compaction_end") {
@@ -567,6 +571,10 @@ export function subscribeEmbeddedPiSession(params: {
} else { } else {
maybeResolveCompactionWait(); maybeResolveCompactionWait();
} }
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry },
});
} }
if (evt.type === "agent_end") { if (evt.type === "agent_end") {

View File

@@ -19,7 +19,7 @@ import {
runEmbeddedPiAgent, runEmbeddedPiAgent,
} from "../agents/pi-embedded.js"; } from "../agents/pi-embedded.js";
import { ensureSandboxWorkspaceForSession } from "../agents/sandbox.js"; import { ensureSandboxWorkspaceForSession } from "../agents/sandbox.js";
import { resolveSessionKey } from "../config/sessions.js"; import { loadSessionStore, resolveSessionKey } from "../config/sessions.js";
import { getReplyFromConfig } from "./reply.js"; import { getReplyFromConfig } from "./reply.js";
import { HEARTBEAT_TOKEN } from "./tokens.js"; import { HEARTBEAT_TOKEN } from "./tokens.js";
@@ -731,6 +731,10 @@ describe("trigger handling", () => {
it("runs /compact as a gated command", async () => { it("runs /compact as a gated command", async () => {
await withTempHome(async (home) => { await withTempHome(async (home) => {
const storePath = join(
tmpdir(),
`clawdbot-session-test-${Date.now()}.json`,
);
vi.mocked(compactEmbeddedPiSession).mockResolvedValue({ vi.mocked(compactEmbeddedPiSession).mockResolvedValue({
ok: true, ok: true,
compacted: true, compacted: true,
@@ -757,7 +761,7 @@ describe("trigger handling", () => {
allowFrom: ["*"], allowFrom: ["*"],
}, },
session: { session: {
store: join(tmpdir(), `clawdbot-session-test-${Date.now()}.json`), store: storePath,
}, },
}, },
); );
@@ -765,6 +769,13 @@ describe("trigger handling", () => {
expect(text?.startsWith("⚙️ Compacted")).toBe(true); expect(text?.startsWith("⚙️ Compacted")).toBe(true);
expect(compactEmbeddedPiSession).toHaveBeenCalledOnce(); expect(compactEmbeddedPiSession).toHaveBeenCalledOnce();
expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
const store = loadSessionStore(storePath);
const sessionKey = resolveSessionKey("per-sender", {
Body: "/compact focus on decisions",
From: "+1003",
To: "+2000",
});
expect(store[sessionKey]?.compactionCount).toBe(1);
}); });
}); });

View File

@@ -1,5 +1,9 @@
import fs from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../../config/sessions.js";
import type { TemplateContext } from "../templating.js"; import type { TemplateContext } from "../templating.js";
import type { GetReplyOptions } from "../types.js"; import type { GetReplyOptions } from "../types.js";
import type { FollowupRun, QueueSettings } from "./queue.js"; import type { FollowupRun, QueueSettings } from "./queue.js";
@@ -54,7 +58,14 @@ type EmbeddedPiAgentParams = {
onPartialReply?: (payload: { text?: string }) => Promise<void> | void; onPartialReply?: (payload: { text?: string }) => Promise<void> | void;
}; };
function createMinimalRun(params?: { opts?: GetReplyOptions }) { function createMinimalRun(params?: {
opts?: GetReplyOptions;
resolvedVerboseLevel?: "off" | "on";
sessionStore?: Record<string, SessionEntry>;
sessionEntry?: SessionEntry;
sessionKey?: string;
storePath?: string;
}) {
const typing = createTyping(); const typing = createTyping();
const opts = params?.opts; const opts = params?.opts;
const sessionCtx = { const sessionCtx = {
@@ -62,13 +73,14 @@ function createMinimalRun(params?: { opts?: GetReplyOptions }) {
MessageSid: "msg", MessageSid: "msg",
} as unknown as TemplateContext; } as unknown as TemplateContext;
const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings; const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings;
const sessionKey = params?.sessionKey ?? "main";
const followupRun = { const followupRun = {
prompt: "hello", prompt: "hello",
summaryLine: "hello", summaryLine: "hello",
enqueuedAt: Date.now(), enqueuedAt: Date.now(),
run: { run: {
sessionId: "session", sessionId: "session",
sessionKey: "main", sessionKey,
surface: "whatsapp", surface: "whatsapp",
sessionFile: "/tmp/session.jsonl", sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp", workspaceDir: "/tmp",
@@ -77,7 +89,7 @@ function createMinimalRun(params?: { opts?: GetReplyOptions }) {
provider: "anthropic", provider: "anthropic",
model: "claude", model: "claude",
thinkLevel: "low", thinkLevel: "low",
verboseLevel: "off", verboseLevel: params?.resolvedVerboseLevel ?? "off",
elevatedLevel: "off", elevatedLevel: "off",
bashElevated: { bashElevated: {
enabled: false, enabled: false,
@@ -104,9 +116,13 @@ function createMinimalRun(params?: { opts?: GetReplyOptions }) {
isStreaming: false, isStreaming: false,
opts, opts,
typing, typing,
sessionEntry: params?.sessionEntry,
sessionStore: params?.sessionStore,
sessionKey,
storePath: params?.storePath,
sessionCtx, sessionCtx,
defaultModel: "anthropic/claude-opus-4-5", defaultModel: "anthropic/claude-opus-4-5",
resolvedVerboseLevel: "off", resolvedVerboseLevel: params?.resolvedVerboseLevel ?? "off",
isNewSession: false, isNewSession: false,
blockStreamingEnabled: false, blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end", resolvedBlockStreamingBreak: "message_end",
@@ -153,4 +169,42 @@ describe("runReplyAgent typing (heartbeat)", () => {
expect(typing.startTypingOnText).not.toHaveBeenCalled(); expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(typing.startTypingLoop).not.toHaveBeenCalled(); expect(typing.startTypingLoop).not.toHaveBeenCalled();
}); });
it("announces auto-compaction in verbose mode and tracks count", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "clawdbot-compaction-")),
"sessions.json",
);
const sessionEntry = { sessionId: "session", updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
runEmbeddedPiAgentMock.mockImplementationOnce(
async (params: {
onAgentEvent?: (evt: {
stream: string;
data: Record<string, unknown>;
}) => void;
}) => {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false },
});
return { payloads: [{ text: "final" }], meta: {} };
},
);
const { run } = createMinimalRun({
resolvedVerboseLevel: "on",
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
const res = await run();
expect(Array.isArray(res)).toBe(true);
const payloads = res as { text?: string }[];
expect(payloads[0]?.text).toContain("Auto-compaction complete");
expect(payloads[0]?.text).toContain("count 1");
expect(sessionStore.main.compactionCount).toBe(1);
});
}); });

View File

@@ -27,6 +27,7 @@ import {
scheduleFollowupDrain, scheduleFollowupDrain,
} from "./queue.js"; } from "./queue.js";
import { extractReplyToTag } from "./reply-tags.js"; import { extractReplyToTag } from "./reply-tags.js";
import { incrementCompactionCount } from "./session-updates.js";
import type { TypingController } from "./typing.js"; import type { TypingController } from "./typing.js";
export async function runReplyAgent(params: { export async function runReplyAgent(params: {
@@ -167,6 +168,7 @@ export async function runReplyAgent(params: {
}; };
let didLogHeartbeatStrip = false; let didLogHeartbeatStrip = false;
let autoCompactionCompleted = false;
try { try {
const runId = crypto.randomUUID(); const runId = crypto.randomUUID();
if (sessionKey) { if (sessionKey) {
@@ -233,6 +235,14 @@ export async function runReplyAgent(params: {
}); });
} }
: undefined, : undefined,
onAgentEvent: (evt) => {
if (evt.stream !== "compaction") return;
const phase = String(evt.data.phase ?? "");
const willRetry = Boolean(evt.data.willRetry);
if (phase === "end" && !willRetry) {
autoCompactionCompleted = true;
}
},
onBlockReply: onBlockReply:
blockStreamingEnabled && opts?.onBlockReply blockStreamingEnabled && opts?.onBlockReply
? async (payload) => { ? async (payload) => {
@@ -478,6 +488,21 @@ export async function runReplyAgent(params: {
// If verbose is enabled and this is a new session, prepend a session hint. // If verbose is enabled and this is a new session, prepend a session hint.
let finalPayloads = filteredPayloads; let finalPayloads = filteredPayloads;
if (autoCompactionCompleted) {
const count = await incrementCompactionCount({
sessionEntry,
sessionStore,
sessionKey,
storePath,
});
if (resolvedVerboseLevel === "on") {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
finalPayloads = [
{ text: `🧹 Auto-compaction complete${suffix}.` },
...finalPayloads,
];
}
}
if (resolvedVerboseLevel === "on" && isNewSession) { if (resolvedVerboseLevel === "on" && isNewSession) {
finalPayloads = [ finalPayloads = [
{ text: `🧭 New session: ${followupRun.run.sessionId}` }, { text: `🧭 New session: ${followupRun.run.sessionId}` },

View File

@@ -44,6 +44,7 @@ import type { ReplyPayload } from "../types.js";
import { isAbortTrigger, setAbortMemory } from "./abort.js"; import { isAbortTrigger, setAbortMemory } from "./abort.js";
import type { InlineDirectives } from "./directive-handling.js"; import type { InlineDirectives } from "./directive-handling.js";
import { stripMentions, stripStructuralPrefixes } from "./mentions.js"; import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
import { incrementCompactionCount } from "./session-updates.js";
export type CommandContext = { export type CommandContext = {
surface: string; surface: string;
@@ -444,6 +445,14 @@ export async function handleCommands(params: {
: "Compacted" : "Compacted"
: "Compaction skipped" : "Compaction skipped"
: "Compaction failed"; : "Compaction failed";
if (result.ok && result.compacted) {
await incrementCompactionCount({
sessionEntry,
sessionStore,
sessionKey,
storePath,
});
}
const reason = result.reason?.trim(); const reason = result.reason?.trim();
const line = reason const line = reason
? `${compactLabel}: ${reason}${contextSummary}` ? `${compactLabel}: ${reason}${contextSummary}`

View File

@@ -0,0 +1,119 @@
import fs from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../../config/sessions.js";
import type { FollowupRun } from "./queue.js";
import type { TypingController } from "./typing.js";
const runEmbeddedPiAgentMock = vi.fn();
vi.mock("../../agents/model-fallback.js", () => ({
runWithModelFallback: async ({
provider,
model,
run,
}: {
provider: string;
model: string;
run: (provider: string, model: string) => Promise<unknown>;
}) => ({
result: await run(provider, model),
provider,
model,
}),
}));
vi.mock("../../agents/pi-embedded.js", () => ({
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
}));
import { createFollowupRunner } from "./followup-runner.js";
function createTyping(): TypingController {
return {
onReplyStart: vi.fn(async () => {}),
startTypingLoop: vi.fn(async () => {}),
startTypingOnText: vi.fn(async () => {}),
refreshTypingTtl: vi.fn(),
cleanup: vi.fn(),
};
}
describe("createFollowupRunner compaction", () => {
it("adds verbose auto-compaction notice and tracks count", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "clawdbot-compaction-")),
"sessions.json",
);
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
};
const sessionStore: Record<string, SessionEntry> = {
main: sessionEntry,
};
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockImplementationOnce(
async (params: {
onAgentEvent?: (evt: {
stream: string;
data: Record<string, unknown>;
}) => void;
}) => {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false },
});
return { payloads: [{ text: "final" }], meta: {} };
},
);
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createTyping(),
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
defaultModel: "anthropic/claude-opus-4-5",
});
const queued = {
prompt: "hello",
summaryLine: "hello",
enqueuedAt: Date.now(),
run: {
sessionId: "session",
sessionKey: "main",
surface: "whatsapp",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
config: {},
skillsSnapshot: {},
provider: "anthropic",
model: "claude",
thinkLevel: "low",
verboseLevel: "on",
elevatedLevel: "off",
bashElevated: {
enabled: false,
allowed: false,
defaultLevel: "off",
},
timeoutMs: 1_000,
blockReplyBreak: "message_end",
},
} as FollowupRun;
await runner(queued);
expect(onBlockReply).toHaveBeenCalled();
expect(onBlockReply.mock.calls[0][0].text).toContain(
"Auto-compaction complete",
);
expect(sessionStore.main.compactionCount).toBe(1);
});
});

View File

@@ -12,6 +12,7 @@ import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { FollowupRun } from "./queue.js"; import type { FollowupRun } from "./queue.js";
import { extractReplyToTag } from "./reply-tags.js"; import { extractReplyToTag } from "./reply-tags.js";
import { incrementCompactionCount } from "./session-updates.js";
import type { TypingController } from "./typing.js"; import type { TypingController } from "./typing.js";
export function createFollowupRunner(params: { export function createFollowupRunner(params: {
@@ -61,6 +62,7 @@ export function createFollowupRunner(params: {
if (queued.run.sessionKey) { if (queued.run.sessionKey) {
registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey }); registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey });
} }
let autoCompactionCompleted = false;
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>; let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = queued.run.provider; let fallbackProvider = queued.run.provider;
let fallbackModel = queued.run.model; let fallbackModel = queued.run.model;
@@ -91,6 +93,14 @@ export function createFollowupRunner(params: {
timeoutMs: queued.run.timeoutMs, timeoutMs: queued.run.timeoutMs,
runId, runId,
blockReplyBreak: queued.run.blockReplyBreak, blockReplyBreak: queued.run.blockReplyBreak,
onAgentEvent: (evt) => {
if (evt.stream !== "compaction") return;
const phase = String(evt.data.phase ?? "");
const willRetry = Boolean(evt.data.willRetry);
if (phase === "end" && !willRetry) {
autoCompactionCompleted = true;
}
},
}), }),
}); });
runResult = fallbackResult.result; runResult = fallbackResult.result;
@@ -132,6 +142,21 @@ export function createFollowupRunner(params: {
if (replyTaggedPayloads.length === 0) return; if (replyTaggedPayloads.length === 0) return;
if (autoCompactionCompleted) {
const count = await incrementCompactionCount({
sessionEntry,
sessionStore,
sessionKey,
storePath,
});
if (queued.run.verboseLevel === "on") {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
replyTaggedPayloads.unshift({
text: `🧹 Auto-compaction complete${suffix}.`,
});
}
}
if (sessionStore && sessionKey) { if (sessionStore && sessionKey) {
const usage = runResult.meta.agentMeta?.usage; const usage = runResult.meta.agentMeta?.usage;
const modelUsed = const modelUsed =

View File

@@ -122,3 +122,32 @@ export async function ensureSkillSnapshot(params: {
return { sessionEntry: nextEntry, skillsSnapshot, systemSent }; return { sessionEntry: nextEntry, skillsSnapshot, systemSent };
} }
export async function incrementCompactionCount(params: {
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
now?: number;
}): Promise<number | undefined> {
const {
sessionEntry,
sessionStore,
sessionKey,
storePath,
now = Date.now(),
} = params;
if (!sessionStore || !sessionKey) return undefined;
const entry = sessionStore[sessionKey] ?? sessionEntry;
if (!entry) return undefined;
const nextCount = (entry.compactionCount ?? 0) + 1;
sessionStore[sessionKey] = {
...entry,
compactionCount: nextCount,
updatedAt: now,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
return nextCount;
}

View File

@@ -22,6 +22,7 @@ describe("buildStatusMessage", () => {
contextTokens: 32_000, contextTokens: 32_000,
thinkingLevel: "low", thinkingLevel: "low",
verboseLevel: "on", verboseLevel: "on",
compactionCount: 2,
}, },
sessionKey: "main", sessionKey: "main",
sessionScope: "per-sender", sessionScope: "per-sender",
@@ -39,6 +40,7 @@ describe("buildStatusMessage", () => {
expect(text).toContain("Runtime: direct"); expect(text).toContain("Runtime: direct");
expect(text).toContain("Context: 16k/32k (50%)"); expect(text).toContain("Context: 16k/32k (50%)");
expect(text).toContain("Session: main"); expect(text).toContain("Session: main");
expect(text).toContain("compactions 2");
expect(text).toContain("Web: linked"); expect(text).toContain("Web: linked");
expect(text).toContain("heartbeat 45s"); expect(text).toContain("heartbeat 45s");
expect(text).toContain("thinking=medium"); expect(text).toContain("thinking=medium");

View File

@@ -217,6 +217,9 @@ export function buildStatusMessage(args: StatusArgs): string {
entry?.updatedAt entry?.updatedAt
? `updated ${formatAge(now - entry.updatedAt)}` ? `updated ${formatAge(now - entry.updatedAt)}`
: "no activity", : "no activity",
typeof entry?.compactionCount === "number"
? `compactions ${entry.compactionCount}`
: undefined,
args.storePath ? `store ${shortenHomePath(args.storePath)}` : undefined, args.storePath ? `store ${shortenHomePath(args.storePath)}` : undefined,
] ]
.filter(Boolean) .filter(Boolean)

View File

@@ -55,6 +55,7 @@ export type SessionEntry = {
modelProvider?: string; modelProvider?: string;
model?: string; model?: string;
contextTokens?: number; contextTokens?: number;
compactionCount?: number;
displayName?: string; displayName?: string;
surface?: string; surface?: string;
subject?: string; subject?: string;