diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b29b4a0c..63e954950 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ - Control UI: avoid Slack config ReferenceError by reading slack config snapshots. Thanks @sreekaransrinath for PR #249. - Telegram: honor routing.groupChat.mentionPatterns for group mention gating. Thanks @regenrek for PR #242. - Auto-reply: block unauthorized `/reset` and infer WhatsApp senders from E.164 inputs. +- Auto-reply: track compaction count in session status; verbose mode announces auto-compactions. ### Maintenance - Deps: bump pi-* stack, Slack SDK, discord-api-types, file-type, zod, and Biome. diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 69bbecf65..330422efd 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -555,6 +555,10 @@ export function subscribeEmbeddedPiSession(params: { compactionInFlight = true; ensureCompactionPromise(); log.debug(`embedded run compaction start: runId=${params.runId}`); + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "start" }, + }); } if (evt.type === "auto_compaction_end") { @@ -567,6 +571,10 @@ export function subscribeEmbeddedPiSession(params: { } else { maybeResolveCompactionWait(); } + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry }, + }); } if (evt.type === "agent_end") { diff --git a/src/auto-reply/reply.triggers.test.ts b/src/auto-reply/reply.triggers.test.ts index 24c53f662..006bfdaca 100644 --- a/src/auto-reply/reply.triggers.test.ts +++ b/src/auto-reply/reply.triggers.test.ts @@ -19,7 +19,7 @@ import { runEmbeddedPiAgent, } from "../agents/pi-embedded.js"; import { ensureSandboxWorkspaceForSession } from "../agents/sandbox.js"; -import { resolveSessionKey } from "../config/sessions.js"; +import { loadSessionStore, resolveSessionKey } from "../config/sessions.js"; import { getReplyFromConfig } from "./reply.js"; import { HEARTBEAT_TOKEN } from "./tokens.js"; @@ -731,6 +731,10 @@ describe("trigger handling", () => { it("runs /compact as a gated command", async () => { await withTempHome(async (home) => { + const storePath = join( + tmpdir(), + `clawdbot-session-test-${Date.now()}.json`, + ); vi.mocked(compactEmbeddedPiSession).mockResolvedValue({ ok: true, compacted: true, @@ -757,7 +761,7 @@ describe("trigger handling", () => { allowFrom: ["*"], }, session: { - store: join(tmpdir(), `clawdbot-session-test-${Date.now()}.json`), + store: storePath, }, }, ); @@ -765,6 +769,13 @@ describe("trigger handling", () => { expect(text?.startsWith("⚙️ Compacted")).toBe(true); expect(compactEmbeddedPiSession).toHaveBeenCalledOnce(); expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + const store = loadSessionStore(storePath); + const sessionKey = resolveSessionKey("per-sender", { + Body: "/compact focus on decisions", + From: "+1003", + To: "+2000", + }); + expect(store[sessionKey]?.compactionCount).toBe(1); }); }); diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts index e5cec45c8..2b437a57f 100644 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts +++ b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts @@ -1,5 +1,9 @@ +import fs from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; import { describe, expect, it, vi } from "vitest"; +import type { SessionEntry } from "../../config/sessions.js"; import type { TemplateContext } from "../templating.js"; import type { GetReplyOptions } from "../types.js"; import type { FollowupRun, QueueSettings } from "./queue.js"; @@ -54,7 +58,14 @@ type EmbeddedPiAgentParams = { onPartialReply?: (payload: { text?: string }) => Promise | void; }; -function createMinimalRun(params?: { opts?: GetReplyOptions }) { +function createMinimalRun(params?: { + opts?: GetReplyOptions; + resolvedVerboseLevel?: "off" | "on"; + sessionStore?: Record; + sessionEntry?: SessionEntry; + sessionKey?: string; + storePath?: string; +}) { const typing = createTyping(); const opts = params?.opts; const sessionCtx = { @@ -62,13 +73,14 @@ function createMinimalRun(params?: { opts?: GetReplyOptions }) { MessageSid: "msg", } as unknown as TemplateContext; const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings; + const sessionKey = params?.sessionKey ?? "main"; const followupRun = { prompt: "hello", summaryLine: "hello", enqueuedAt: Date.now(), run: { sessionId: "session", - sessionKey: "main", + sessionKey, surface: "whatsapp", sessionFile: "/tmp/session.jsonl", workspaceDir: "/tmp", @@ -77,7 +89,7 @@ function createMinimalRun(params?: { opts?: GetReplyOptions }) { provider: "anthropic", model: "claude", thinkLevel: "low", - verboseLevel: "off", + verboseLevel: params?.resolvedVerboseLevel ?? "off", elevatedLevel: "off", bashElevated: { enabled: false, @@ -104,9 +116,13 @@ function createMinimalRun(params?: { opts?: GetReplyOptions }) { isStreaming: false, opts, typing, + sessionEntry: params?.sessionEntry, + sessionStore: params?.sessionStore, + sessionKey, + storePath: params?.storePath, sessionCtx, defaultModel: "anthropic/claude-opus-4-5", - resolvedVerboseLevel: "off", + resolvedVerboseLevel: params?.resolvedVerboseLevel ?? "off", isNewSession: false, blockStreamingEnabled: false, resolvedBlockStreamingBreak: "message_end", @@ -153,4 +169,42 @@ describe("runReplyAgent typing (heartbeat)", () => { expect(typing.startTypingOnText).not.toHaveBeenCalled(); expect(typing.startTypingLoop).not.toHaveBeenCalled(); }); + + it("announces auto-compaction in verbose mode and tracks count", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(tmpdir(), "clawdbot-compaction-")), + "sessions.json", + ); + const sessionEntry = { sessionId: "session", updatedAt: Date.now() }; + const sessionStore = { main: sessionEntry }; + + runEmbeddedPiAgentMock.mockImplementationOnce( + async (params: { + onAgentEvent?: (evt: { + stream: string; + data: Record; + }) => void; + }) => { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }, + ); + + const { run } = createMinimalRun({ + resolvedVerboseLevel: "on", + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + const res = await run(); + expect(Array.isArray(res)).toBe(true); + const payloads = res as { text?: string }[]; + expect(payloads[0]?.text).toContain("Auto-compaction complete"); + expect(payloads[0]?.text).toContain("count 1"); + expect(sessionStore.main.compactionCount).toBe(1); + }); }); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index fd2a8eef1..9f994bdd6 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -27,6 +27,7 @@ import { scheduleFollowupDrain, } from "./queue.js"; import { extractReplyToTag } from "./reply-tags.js"; +import { incrementCompactionCount } from "./session-updates.js"; import type { TypingController } from "./typing.js"; export async function runReplyAgent(params: { @@ -167,6 +168,7 @@ export async function runReplyAgent(params: { }; let didLogHeartbeatStrip = false; + let autoCompactionCompleted = false; try { const runId = crypto.randomUUID(); if (sessionKey) { @@ -233,6 +235,14 @@ export async function runReplyAgent(params: { }); } : undefined, + onAgentEvent: (evt) => { + if (evt.stream !== "compaction") return; + const phase = String(evt.data.phase ?? ""); + const willRetry = Boolean(evt.data.willRetry); + if (phase === "end" && !willRetry) { + autoCompactionCompleted = true; + } + }, onBlockReply: blockStreamingEnabled && opts?.onBlockReply ? async (payload) => { @@ -478,6 +488,21 @@ export async function runReplyAgent(params: { // If verbose is enabled and this is a new session, prepend a session hint. let finalPayloads = filteredPayloads; + if (autoCompactionCompleted) { + const count = await incrementCompactionCount({ + sessionEntry, + sessionStore, + sessionKey, + storePath, + }); + if (resolvedVerboseLevel === "on") { + const suffix = typeof count === "number" ? ` (count ${count})` : ""; + finalPayloads = [ + { text: `🧹 Auto-compaction complete${suffix}.` }, + ...finalPayloads, + ]; + } + } if (resolvedVerboseLevel === "on" && isNewSession) { finalPayloads = [ { text: `🧭 New session: ${followupRun.run.sessionId}` }, diff --git a/src/auto-reply/reply/commands.ts b/src/auto-reply/reply/commands.ts index 21b65a91c..22cc7f7c8 100644 --- a/src/auto-reply/reply/commands.ts +++ b/src/auto-reply/reply/commands.ts @@ -44,6 +44,7 @@ import type { ReplyPayload } from "../types.js"; import { isAbortTrigger, setAbortMemory } from "./abort.js"; import type { InlineDirectives } from "./directive-handling.js"; import { stripMentions, stripStructuralPrefixes } from "./mentions.js"; +import { incrementCompactionCount } from "./session-updates.js"; export type CommandContext = { surface: string; @@ -444,6 +445,14 @@ export async function handleCommands(params: { : "Compacted" : "Compaction skipped" : "Compaction failed"; + if (result.ok && result.compacted) { + await incrementCompactionCount({ + sessionEntry, + sessionStore, + sessionKey, + storePath, + }); + } const reason = result.reason?.trim(); const line = reason ? `${compactLabel}: ${reason} • ${contextSummary}` diff --git a/src/auto-reply/reply/followup-runner.compaction.test.ts b/src/auto-reply/reply/followup-runner.compaction.test.ts new file mode 100644 index 000000000..b4ac4c856 --- /dev/null +++ b/src/auto-reply/reply/followup-runner.compaction.test.ts @@ -0,0 +1,119 @@ +import fs from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; + +import type { SessionEntry } from "../../config/sessions.js"; +import type { FollowupRun } from "./queue.js"; +import type { TypingController } from "./typing.js"; + +const runEmbeddedPiAgentMock = vi.fn(); + +vi.mock("../../agents/model-fallback.js", () => ({ + runWithModelFallback: async ({ + provider, + model, + run, + }: { + provider: string; + model: string; + run: (provider: string, model: string) => Promise; + }) => ({ + result: await run(provider, model), + provider, + model, + }), +})); + +vi.mock("../../agents/pi-embedded.js", () => ({ + runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), +})); + +import { createFollowupRunner } from "./followup-runner.js"; + +function createTyping(): TypingController { + return { + onReplyStart: vi.fn(async () => {}), + startTypingLoop: vi.fn(async () => {}), + startTypingOnText: vi.fn(async () => {}), + refreshTypingTtl: vi.fn(), + cleanup: vi.fn(), + }; +} + +describe("createFollowupRunner compaction", () => { + it("adds verbose auto-compaction notice and tracks count", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(tmpdir(), "clawdbot-compaction-")), + "sessions.json", + ); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore: Record = { + main: sessionEntry, + }; + const onBlockReply = vi.fn(async () => {}); + + runEmbeddedPiAgentMock.mockImplementationOnce( + async (params: { + onAgentEvent?: (evt: { + stream: string; + data: Record; + }) => void; + }) => { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }, + ); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createTyping(), + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = { + prompt: "hello", + summaryLine: "hello", + enqueuedAt: Date.now(), + run: { + sessionId: "session", + sessionKey: "main", + surface: "whatsapp", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + config: {}, + skillsSnapshot: {}, + provider: "anthropic", + model: "claude", + thinkLevel: "low", + verboseLevel: "on", + elevatedLevel: "off", + bashElevated: { + enabled: false, + allowed: false, + defaultLevel: "off", + }, + timeoutMs: 1_000, + blockReplyBreak: "message_end", + }, + } as FollowupRun; + + await runner(queued); + + expect(onBlockReply).toHaveBeenCalled(); + expect(onBlockReply.mock.calls[0][0].text).toContain( + "Auto-compaction complete", + ); + expect(sessionStore.main.compactionCount).toBe(1); + }); +}); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index ebb6d5cfa..528bca679 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -12,6 +12,7 @@ import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { FollowupRun } from "./queue.js"; import { extractReplyToTag } from "./reply-tags.js"; +import { incrementCompactionCount } from "./session-updates.js"; import type { TypingController } from "./typing.js"; export function createFollowupRunner(params: { @@ -61,6 +62,7 @@ export function createFollowupRunner(params: { if (queued.run.sessionKey) { registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey }); } + let autoCompactionCompleted = false; let runResult: Awaited>; let fallbackProvider = queued.run.provider; let fallbackModel = queued.run.model; @@ -91,6 +93,14 @@ export function createFollowupRunner(params: { timeoutMs: queued.run.timeoutMs, runId, blockReplyBreak: queued.run.blockReplyBreak, + onAgentEvent: (evt) => { + if (evt.stream !== "compaction") return; + const phase = String(evt.data.phase ?? ""); + const willRetry = Boolean(evt.data.willRetry); + if (phase === "end" && !willRetry) { + autoCompactionCompleted = true; + } + }, }), }); runResult = fallbackResult.result; @@ -132,6 +142,21 @@ export function createFollowupRunner(params: { if (replyTaggedPayloads.length === 0) return; + if (autoCompactionCompleted) { + const count = await incrementCompactionCount({ + sessionEntry, + sessionStore, + sessionKey, + storePath, + }); + if (queued.run.verboseLevel === "on") { + const suffix = typeof count === "number" ? ` (count ${count})` : ""; + replyTaggedPayloads.unshift({ + text: `🧹 Auto-compaction complete${suffix}.`, + }); + } + } + if (sessionStore && sessionKey) { const usage = runResult.meta.agentMeta?.usage; const modelUsed = diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index 23e1a8777..f780455e0 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -122,3 +122,32 @@ export async function ensureSkillSnapshot(params: { return { sessionEntry: nextEntry, skillsSnapshot, systemSent }; } + +export async function incrementCompactionCount(params: { + sessionEntry?: SessionEntry; + sessionStore?: Record; + sessionKey?: string; + storePath?: string; + now?: number; +}): Promise { + const { + sessionEntry, + sessionStore, + sessionKey, + storePath, + now = Date.now(), + } = params; + if (!sessionStore || !sessionKey) return undefined; + const entry = sessionStore[sessionKey] ?? sessionEntry; + if (!entry) return undefined; + const nextCount = (entry.compactionCount ?? 0) + 1; + sessionStore[sessionKey] = { + ...entry, + compactionCount: nextCount, + updatedAt: now, + }; + if (storePath) { + await saveSessionStore(storePath, sessionStore); + } + return nextCount; +} diff --git a/src/auto-reply/status.test.ts b/src/auto-reply/status.test.ts index 187f0ac97..3d4bc9587 100644 --- a/src/auto-reply/status.test.ts +++ b/src/auto-reply/status.test.ts @@ -22,6 +22,7 @@ describe("buildStatusMessage", () => { contextTokens: 32_000, thinkingLevel: "low", verboseLevel: "on", + compactionCount: 2, }, sessionKey: "main", sessionScope: "per-sender", @@ -39,6 +40,7 @@ describe("buildStatusMessage", () => { expect(text).toContain("Runtime: direct"); expect(text).toContain("Context: 16k/32k (50%)"); expect(text).toContain("Session: main"); + expect(text).toContain("compactions 2"); expect(text).toContain("Web: linked"); expect(text).toContain("heartbeat 45s"); expect(text).toContain("thinking=medium"); diff --git a/src/auto-reply/status.ts b/src/auto-reply/status.ts index 9c4e2ca01..bec2aa262 100644 --- a/src/auto-reply/status.ts +++ b/src/auto-reply/status.ts @@ -217,6 +217,9 @@ export function buildStatusMessage(args: StatusArgs): string { entry?.updatedAt ? `updated ${formatAge(now - entry.updatedAt)}` : "no activity", + typeof entry?.compactionCount === "number" + ? `compactions ${entry.compactionCount}` + : undefined, args.storePath ? `store ${shortenHomePath(args.storePath)}` : undefined, ] .filter(Boolean) diff --git a/src/config/sessions.ts b/src/config/sessions.ts index 049d4420a..ff440eab8 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -55,6 +55,7 @@ export type SessionEntry = { modelProvider?: string; model?: string; contextTokens?: number; + compactionCount?: number; displayName?: string; surface?: string; subject?: string;