diff --git a/src/auto-reply/reply/agent-runner.messaging-tools.test.ts b/src/auto-reply/reply/agent-runner.messaging-tools.test.ts index ecbcd8e18..394aeb991 100644 --- a/src/auto-reply/reply/agent-runner.messaging-tools.test.ts +++ b/src/auto-reply/reply/agent-runner.messaging-tools.test.ts @@ -1,6 +1,10 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import type { TemplateContext } from "../templating.js"; +import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js"; import type { FollowupRun, QueueSettings } from "./queue.js"; import { createMockTypingController } from "./test-helpers.js"; @@ -38,8 +42,12 @@ vi.mock("./queue.js", async () => { import { runReplyAgent } from "./agent-runner.js"; -function createRun(messageProvider = "slack") { +function createRun( + messageProvider = "slack", + opts: { storePath?: string; sessionKey?: string } = {}, +) { const typing = createMockTypingController(); + const sessionKey = opts.sessionKey ?? "main"; const sessionCtx = { Provider: messageProvider, OriginatingTo: "channel:C1", @@ -53,7 +61,7 @@ function createRun(messageProvider = "slack") { enqueuedAt: Date.now(), run: { sessionId: "session", - sessionKey: "main", + sessionKey, messageProvider, sessionFile: "/tmp/session.jsonl", workspaceDir: "/tmp", @@ -85,6 +93,8 @@ function createRun(messageProvider = "slack") { isStreaming: false, typing, sessionCtx, + sessionKey, + storePath: opts.storePath, defaultModel: "anthropic/claude-opus-4-5", resolvedVerboseLevel: "off", isNewSession: false, @@ -141,4 +151,34 @@ describe("runReplyAgent messaging tool suppression", () => { expect(result).toMatchObject({ text: "hello world!" }); }); + + it("persists usage even when replies are suppressed", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-session-store-")), + "sessions.json", + ); + const sessionKey = "main"; + const entry: SessionEntry = { sessionId: "session", updatedAt: Date.now() }; + await saveSessionStore(storePath, { [sessionKey]: entry }); + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], + meta: { + agentMeta: { + usage: { input: 10, output: 5 }, + model: "claude-opus-4-5", + provider: "anthropic", + }, + }, + }); + + const result = await createRun("slack", { storePath, sessionKey }); + + expect(result).toBeUndefined(); + const store = loadSessionStore(storePath, { skipCache: true }); + expect(store[sessionKey]?.totalTokens ?? 0).toBeGreaterThan(0); + expect(store[sessionKey]?.model).toBe("claude-opus-4-5"); + }); }); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 1aeb8e78f..dec2d789a 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -1,6 +1,5 @@ import crypto from "node:crypto"; import fs from "node:fs"; -import { setCliSessionId } from "../../agents/cli-session.js"; import { lookupContextTokens } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { resolveModelAuthMode } from "../../agents/model-auth.js"; @@ -38,6 +37,7 @@ import { resolveBlockStreamingCoalescing } from "./block-streaming.js"; import { createFollowupRunner } from "./followup-runner.js"; import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js"; import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; +import { persistSessionUsageUpdate } from "./session-usage.js"; import { incrementCompactionCount } from "./session-updates.js"; import type { TypingController } from "./typing.js"; import { createTypingSignaler } from "./typing-mode.js"; @@ -365,6 +365,30 @@ export async function runReplyAgent(params: { await Promise.allSettled(pendingToolTasks); } + const usage = runResult.meta.agentMeta?.usage; + const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; + const providerUsed = + runResult.meta.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider; + const cliSessionId = isCliProvider(providerUsed, cfg) + ? runResult.meta.agentMeta?.sessionId?.trim() + : undefined; + const contextTokensUsed = + agentCfgContextTokens ?? + lookupContextTokens(modelUsed) ?? + activeSessionEntry?.contextTokens ?? + DEFAULT_CONTEXT_TOKENS; + + await persistSessionUsageUpdate({ + storePath, + sessionKey, + usage, + modelUsed, + providerUsed, + contextTokensUsed, + systemPromptReport: runResult.meta.systemPromptReport, + cliSessionId, + }); + // Drain any late tool/block deliveries before deciding there's "nothing to send". // Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and // keep the typing indicator stuck. @@ -395,19 +419,6 @@ export async function runReplyAgent(params: { await signalTypingIfNeeded(replyPayloads, typingSignals); - const usage = runResult.meta.agentMeta?.usage; - const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; - const providerUsed = - runResult.meta.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider; - const cliSessionId = isCliProvider(providerUsed, cfg) - ? runResult.meta.agentMeta?.sessionId?.trim() - : undefined; - const contextTokensUsed = - agentCfgContextTokens ?? - lookupContextTokens(modelUsed) ?? - activeSessionEntry?.contextTokens ?? - DEFAULT_CONTEXT_TOKENS; - if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) { const input = usage.input ?? 0; const output = usage.output ?? 0; @@ -445,72 +456,6 @@ export async function runReplyAgent(params: { }); } - if (storePath && sessionKey) { - if (hasNonzeroUsage(usage)) { - try { - await updateSessionStoreEntry({ - storePath, - sessionKey, - update: async (entry) => { - const input = usage.input ?? 0; - const output = usage.output ?? 0; - const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); - const patch: Partial = { - inputTokens: input, - outputTokens: output, - totalTokens: promptTokens > 0 ? promptTokens : (usage.total ?? input), - modelProvider: providerUsed, - model: modelUsed, - contextTokens: contextTokensUsed ?? entry.contextTokens, - systemPromptReport: runResult.meta.systemPromptReport ?? entry.systemPromptReport, - updatedAt: Date.now(), - }; - if (cliSessionId) { - const nextEntry = { ...entry, ...patch }; - setCliSessionId(nextEntry, providerUsed, cliSessionId); - return { - ...patch, - cliSessionIds: nextEntry.cliSessionIds, - claudeCliSessionId: nextEntry.claudeCliSessionId, - }; - } - return patch; - }, - }); - } catch (err) { - logVerbose(`failed to persist usage update: ${String(err)}`); - } - } else if (modelUsed || contextTokensUsed) { - try { - await updateSessionStoreEntry({ - storePath, - sessionKey, - update: async (entry) => { - const patch: Partial = { - modelProvider: providerUsed ?? entry.modelProvider, - model: modelUsed ?? entry.model, - contextTokens: contextTokensUsed ?? entry.contextTokens, - systemPromptReport: runResult.meta.systemPromptReport ?? entry.systemPromptReport, - updatedAt: Date.now(), - }; - if (cliSessionId) { - const nextEntry = { ...entry, ...patch }; - setCliSessionId(nextEntry, providerUsed, cliSessionId); - return { - ...patch, - cliSessionIds: nextEntry.cliSessionIds, - claudeCliSessionId: nextEntry.claudeCliSessionId, - }; - } - return patch; - }, - }); - } catch (err) { - logVerbose(`failed to persist model/context update: ${String(err)}`); - } - } - } - const responseUsageRaw = activeSessionEntry?.responseUsage ?? (sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined); diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 19213081d..ceb5c2ab2 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -3,7 +3,7 @@ import { tmpdir } from "node:os"; import path from "node:path"; import { describe, expect, it, vi } from "vitest"; -import type { SessionEntry } from "../../config/sessions.js"; +import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js"; import type { FollowupRun } from "./queue.js"; import { createMockTypingController } from "./test-helpers.js"; @@ -195,4 +195,47 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).not.toHaveBeenCalled(); }); + + it("persists usage even when replies are suppressed", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(tmpdir(), "clawdbot-followup-usage-")), + "sessions.json", + ); + const sessionKey = "main"; + const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() }; + const sessionStore: Record = { [sessionKey]: sessionEntry }; + await saveSessionStore(storePath, sessionStore); + + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }], + meta: { + agentMeta: { + usage: { input: 10, output: 5 }, + model: "claude-opus-4-5", + provider: "anthropic", + }, + }, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + sessionEntry, + sessionStore, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + }); + + await runner(baseQueuedRun("slack")); + + expect(onBlockReply).not.toHaveBeenCalled(); + const store = loadSessionStore(storePath, { skipCache: true }); + expect(store[sessionKey]?.totalTokens ?? 0).toBeGreaterThan(0); + expect(store[sessionKey]?.model).toBe("claude-opus-4-5"); + }); }); diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index dfda65897..febbc6e6a 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -4,12 +4,7 @@ import { lookupContextTokens } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; -import { hasNonzeroUsage } from "../../agents/usage.js"; -import { - resolveAgentIdFromSessionKey, - type SessionEntry, - updateSessionStoreEntry, -} from "../../config/sessions.js"; +import { resolveAgentIdFromSessionKey, type SessionEntry } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; @@ -26,6 +21,7 @@ import { } from "./reply-payloads.js"; import { resolveReplyToMode } from "./reply-threading.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; +import { persistSessionUsageUpdate } from "./session-usage.js"; import { incrementCompactionCount } from "./session-updates.js"; import type { TypingController } from "./typing.js"; import { createTypingSignaler } from "./typing-mode.js"; @@ -190,6 +186,26 @@ export function createFollowupRunner(params: { return; } + if (storePath && sessionKey) { + const usage = runResult.meta.agentMeta?.usage; + const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; + const contextTokensUsed = + agentCfgContextTokens ?? + lookupContextTokens(modelUsed) ?? + sessionEntry?.contextTokens ?? + DEFAULT_CONTEXT_TOKENS; + + await persistSessionUsageUpdate({ + storePath, + sessionKey, + usage, + modelUsed, + providerUsed: fallbackProvider, + contextTokensUsed, + logLabel: "followup", + }); + } + const payloadArray = runResult.payloads ?? []; if (payloadArray.length === 0) return; const sanitizedPayloads = payloadArray.flatMap((payload) => { @@ -245,56 +261,6 @@ export function createFollowupRunner(params: { } } - if (storePath && sessionKey) { - const usage = runResult.meta.agentMeta?.usage; - const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; - const contextTokensUsed = - agentCfgContextTokens ?? - lookupContextTokens(modelUsed) ?? - sessionEntry?.contextTokens ?? - DEFAULT_CONTEXT_TOKENS; - - if (hasNonzeroUsage(usage)) { - try { - await updateSessionStoreEntry({ - storePath, - sessionKey, - update: async (entry) => { - const input = usage.input ?? 0; - const output = usage.output ?? 0; - const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); - return { - inputTokens: input, - outputTokens: output, - totalTokens: promptTokens > 0 ? promptTokens : (usage.total ?? input), - modelProvider: fallbackProvider ?? entry.modelProvider, - model: modelUsed, - contextTokens: contextTokensUsed ?? entry.contextTokens, - updatedAt: Date.now(), - }; - }, - }); - } catch (err) { - logVerbose(`failed to persist followup usage update: ${String(err)}`); - } - } else if (modelUsed || contextTokensUsed) { - try { - await updateSessionStoreEntry({ - storePath, - sessionKey, - update: async (entry) => ({ - modelProvider: fallbackProvider ?? entry.modelProvider, - model: modelUsed ?? entry.model, - contextTokens: contextTokensUsed ?? entry.contextTokens, - updatedAt: Date.now(), - }), - }); - } catch (err) { - logVerbose(`failed to persist followup model/context update: ${String(err)}`); - } - } - } - await sendFollowupPayloads(finalPayloads, queued); } finally { typing.markRunComplete(); diff --git a/src/auto-reply/reply/session-usage.ts b/src/auto-reply/reply/session-usage.ts new file mode 100644 index 000000000..1a048b55e --- /dev/null +++ b/src/auto-reply/reply/session-usage.ts @@ -0,0 +1,92 @@ +import { setCliSessionId } from "../../agents/cli-session.js"; +import { hasNonzeroUsage, type NormalizedUsage } from "../../agents/usage.js"; +import { + type SessionSystemPromptReport, + type SessionEntry, + updateSessionStoreEntry, +} from "../../config/sessions.js"; +import { logVerbose } from "../../globals.js"; + +export async function persistSessionUsageUpdate(params: { + storePath?: string; + sessionKey?: string; + usage?: NormalizedUsage; + modelUsed?: string; + providerUsed?: string; + contextTokensUsed?: number; + systemPromptReport?: SessionSystemPromptReport; + cliSessionId?: string; + logLabel?: string; +}): Promise { + const { storePath, sessionKey } = params; + if (!storePath || !sessionKey) return; + + const label = params.logLabel ? `${params.logLabel} ` : ""; + if (hasNonzeroUsage(params.usage)) { + try { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async (entry) => { + const input = params.usage?.input ?? 0; + const output = params.usage?.output ?? 0; + const promptTokens = + input + (params.usage?.cacheRead ?? 0) + (params.usage?.cacheWrite ?? 0); + const patch: Partial = { + inputTokens: input, + outputTokens: output, + totalTokens: promptTokens > 0 ? promptTokens : (params.usage?.total ?? input), + modelProvider: params.providerUsed ?? entry.modelProvider, + model: params.modelUsed ?? entry.model, + contextTokens: params.contextTokensUsed ?? entry.contextTokens, + systemPromptReport: params.systemPromptReport ?? entry.systemPromptReport, + updatedAt: Date.now(), + }; + if (params.cliSessionId) { + const nextEntry = { ...entry, ...patch }; + setCliSessionId(nextEntry, params.providerUsed, params.cliSessionId); + return { + ...patch, + cliSessionIds: nextEntry.cliSessionIds, + claudeCliSessionId: nextEntry.claudeCliSessionId, + }; + } + return patch; + }, + }); + } catch (err) { + logVerbose(`failed to persist ${label}usage update: ${String(err)}`); + } + return; + } + + if (params.modelUsed || params.contextTokensUsed) { + try { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async (entry) => { + const patch: Partial = { + modelProvider: params.providerUsed ?? entry.modelProvider, + model: params.modelUsed ?? entry.model, + contextTokens: params.contextTokensUsed ?? entry.contextTokens, + systemPromptReport: params.systemPromptReport ?? entry.systemPromptReport, + updatedAt: Date.now(), + }; + if (params.cliSessionId) { + const nextEntry = { ...entry, ...patch }; + setCliSessionId(nextEntry, params.providerUsed, params.cliSessionId); + return { + ...patch, + cliSessionIds: nextEntry.cliSessionIds, + claudeCliSessionId: nextEntry.claudeCliSessionId, + }; + } + return patch; + }, + }); + } catch (err) { + logVerbose(`failed to persist ${label}model/context update: ${String(err)}`); + } + } +}