fix: persist session usage metadata on suppressed replies

This commit is contained in:
Peter Steinberger
2026-01-24 11:04:53 +00:00
parent c02204fd1e
commit 1bbbb10abf
5 changed files with 225 additions and 139 deletions

View File

@@ -1,6 +1,10 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import type { TemplateContext } from "../templating.js";
import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js";
import type { FollowupRun, QueueSettings } from "./queue.js";
import { createMockTypingController } from "./test-helpers.js";
@@ -38,8 +42,12 @@ vi.mock("./queue.js", async () => {
import { runReplyAgent } from "./agent-runner.js";
function createRun(messageProvider = "slack") {
function createRun(
messageProvider = "slack",
opts: { storePath?: string; sessionKey?: string } = {},
) {
const typing = createMockTypingController();
const sessionKey = opts.sessionKey ?? "main";
const sessionCtx = {
Provider: messageProvider,
OriginatingTo: "channel:C1",
@@ -53,7 +61,7 @@ function createRun(messageProvider = "slack") {
enqueuedAt: Date.now(),
run: {
sessionId: "session",
sessionKey: "main",
sessionKey,
messageProvider,
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
@@ -85,6 +93,8 @@ function createRun(messageProvider = "slack") {
isStreaming: false,
typing,
sessionCtx,
sessionKey,
storePath: opts.storePath,
defaultModel: "anthropic/claude-opus-4-5",
resolvedVerboseLevel: "off",
isNewSession: false,
@@ -141,4 +151,34 @@ describe("runReplyAgent messaging tool suppression", () => {
expect(result).toMatchObject({ text: "hello world!" });
});
it("persists usage even when replies are suppressed", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-session-store-")),
"sessions.json",
);
const sessionKey = "main";
const entry: SessionEntry = { sessionId: "session", updatedAt: Date.now() };
await saveSessionStore(storePath, { [sessionKey]: entry });
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
meta: {
agentMeta: {
usage: { input: 10, output: 5 },
model: "claude-opus-4-5",
provider: "anthropic",
},
},
});
const result = await createRun("slack", { storePath, sessionKey });
expect(result).toBeUndefined();
const store = loadSessionStore(storePath, { skipCache: true });
expect(store[sessionKey]?.totalTokens ?? 0).toBeGreaterThan(0);
expect(store[sessionKey]?.model).toBe("claude-opus-4-5");
});
});

View File

@@ -1,6 +1,5 @@
import crypto from "node:crypto";
import fs from "node:fs";
import { setCliSessionId } from "../../agents/cli-session.js";
import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { resolveModelAuthMode } from "../../agents/model-auth.js";
@@ -38,6 +37,7 @@ import { resolveBlockStreamingCoalescing } from "./block-streaming.js";
import { createFollowupRunner } from "./followup-runner.js";
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
import { persistSessionUsageUpdate } from "./session-usage.js";
import { incrementCompactionCount } from "./session-updates.js";
import type { TypingController } from "./typing.js";
import { createTypingSignaler } from "./typing-mode.js";
@@ -365,6 +365,30 @@ export async function runReplyAgent(params: {
await Promise.allSettled(pendingToolTasks);
}
const usage = runResult.meta.agentMeta?.usage;
const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const providerUsed =
runResult.meta.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider;
const cliSessionId = isCliProvider(providerUsed, cfg)
? runResult.meta.agentMeta?.sessionId?.trim()
: undefined;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
activeSessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
await persistSessionUsageUpdate({
storePath,
sessionKey,
usage,
modelUsed,
providerUsed,
contextTokensUsed,
systemPromptReport: runResult.meta.systemPromptReport,
cliSessionId,
});
// Drain any late tool/block deliveries before deciding there's "nothing to send".
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
// keep the typing indicator stuck.
@@ -395,19 +419,6 @@ export async function runReplyAgent(params: {
await signalTypingIfNeeded(replyPayloads, typingSignals);
const usage = runResult.meta.agentMeta?.usage;
const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const providerUsed =
runResult.meta.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider;
const cliSessionId = isCliProvider(providerUsed, cfg)
? runResult.meta.agentMeta?.sessionId?.trim()
: undefined;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
activeSessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
@@ -445,72 +456,6 @@ export async function runReplyAgent(params: {
});
}
if (storePath && sessionKey) {
if (hasNonzeroUsage(usage)) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
const patch: Partial<SessionEntry> = {
inputTokens: input,
outputTokens: output,
totalTokens: promptTokens > 0 ? promptTokens : (usage.total ?? input),
modelProvider: providerUsed,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
systemPromptReport: runResult.meta.systemPromptReport ?? entry.systemPromptReport,
updatedAt: Date.now(),
};
if (cliSessionId) {
const nextEntry = { ...entry, ...patch };
setCliSessionId(nextEntry, providerUsed, cliSessionId);
return {
...patch,
cliSessionIds: nextEntry.cliSessionIds,
claudeCliSessionId: nextEntry.claudeCliSessionId,
};
}
return patch;
},
});
} catch (err) {
logVerbose(`failed to persist usage update: ${String(err)}`);
}
} else if (modelUsed || contextTokensUsed) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => {
const patch: Partial<SessionEntry> = {
modelProvider: providerUsed ?? entry.modelProvider,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
systemPromptReport: runResult.meta.systemPromptReport ?? entry.systemPromptReport,
updatedAt: Date.now(),
};
if (cliSessionId) {
const nextEntry = { ...entry, ...patch };
setCliSessionId(nextEntry, providerUsed, cliSessionId);
return {
...patch,
cliSessionIds: nextEntry.cliSessionIds,
claudeCliSessionId: nextEntry.claudeCliSessionId,
};
}
return patch;
},
});
} catch (err) {
logVerbose(`failed to persist model/context update: ${String(err)}`);
}
}
}
const responseUsageRaw =
activeSessionEntry?.responseUsage ??
(sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined);

View File

@@ -3,7 +3,7 @@ import { tmpdir } from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../../config/sessions.js";
import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js";
import type { FollowupRun } from "./queue.js";
import { createMockTypingController } from "./test-helpers.js";
@@ -195,4 +195,47 @@ describe("createFollowupRunner messaging tool dedupe", () => {
expect(onBlockReply).not.toHaveBeenCalled();
});
it("persists usage even when replies are suppressed", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "clawdbot-followup-usage-")),
"sessions.json",
);
const sessionKey = "main";
const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() };
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await saveSessionStore(storePath, sessionStore);
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
meta: {
agentMeta: {
usage: { input: 10, output: 5 },
model: "claude-opus-4-5",
provider: "anthropic",
},
},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun("slack"));
expect(onBlockReply).not.toHaveBeenCalled();
const store = loadSessionStore(storePath, { skipCache: true });
expect(store[sessionKey]?.totalTokens ?? 0).toBeGreaterThan(0);
expect(store[sessionKey]?.model).toBe("claude-opus-4-5");
});
});

View File

@@ -4,12 +4,7 @@ import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { runWithModelFallback } from "../../agents/model-fallback.js";
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import {
resolveAgentIdFromSessionKey,
type SessionEntry,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import { resolveAgentIdFromSessionKey, type SessionEntry } from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
@@ -26,6 +21,7 @@ import {
} from "./reply-payloads.js";
import { resolveReplyToMode } from "./reply-threading.js";
import { isRoutableChannel, routeReply } from "./route-reply.js";
import { persistSessionUsageUpdate } from "./session-usage.js";
import { incrementCompactionCount } from "./session-updates.js";
import type { TypingController } from "./typing.js";
import { createTypingSignaler } from "./typing-mode.js";
@@ -190,6 +186,26 @@ export function createFollowupRunner(params: {
return;
}
if (storePath && sessionKey) {
const usage = runResult.meta.agentMeta?.usage;
const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
await persistSessionUsageUpdate({
storePath,
sessionKey,
usage,
modelUsed,
providerUsed: fallbackProvider,
contextTokensUsed,
logLabel: "followup",
});
}
const payloadArray = runResult.payloads ?? [];
if (payloadArray.length === 0) return;
const sanitizedPayloads = payloadArray.flatMap((payload) => {
@@ -245,56 +261,6 @@ export function createFollowupRunner(params: {
}
}
if (storePath && sessionKey) {
const usage = runResult.meta.agentMeta?.usage;
const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
if (hasNonzeroUsage(usage)) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
return {
inputTokens: input,
outputTokens: output,
totalTokens: promptTokens > 0 ? promptTokens : (usage.total ?? input),
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
};
},
});
} catch (err) {
logVerbose(`failed to persist followup usage update: ${String(err)}`);
}
} else if (modelUsed || contextTokensUsed) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => ({
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
}),
});
} catch (err) {
logVerbose(`failed to persist followup model/context update: ${String(err)}`);
}
}
}
await sendFollowupPayloads(finalPayloads, queued);
} finally {
typing.markRunComplete();

View File

@@ -0,0 +1,92 @@
import { setCliSessionId } from "../../agents/cli-session.js";
import { hasNonzeroUsage, type NormalizedUsage } from "../../agents/usage.js";
import {
type SessionSystemPromptReport,
type SessionEntry,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
export async function persistSessionUsageUpdate(params: {
storePath?: string;
sessionKey?: string;
usage?: NormalizedUsage;
modelUsed?: string;
providerUsed?: string;
contextTokensUsed?: number;
systemPromptReport?: SessionSystemPromptReport;
cliSessionId?: string;
logLabel?: string;
}): Promise<void> {
const { storePath, sessionKey } = params;
if (!storePath || !sessionKey) return;
const label = params.logLabel ? `${params.logLabel} ` : "";
if (hasNonzeroUsage(params.usage)) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => {
const input = params.usage?.input ?? 0;
const output = params.usage?.output ?? 0;
const promptTokens =
input + (params.usage?.cacheRead ?? 0) + (params.usage?.cacheWrite ?? 0);
const patch: Partial<SessionEntry> = {
inputTokens: input,
outputTokens: output,
totalTokens: promptTokens > 0 ? promptTokens : (params.usage?.total ?? input),
modelProvider: params.providerUsed ?? entry.modelProvider,
model: params.modelUsed ?? entry.model,
contextTokens: params.contextTokensUsed ?? entry.contextTokens,
systemPromptReport: params.systemPromptReport ?? entry.systemPromptReport,
updatedAt: Date.now(),
};
if (params.cliSessionId) {
const nextEntry = { ...entry, ...patch };
setCliSessionId(nextEntry, params.providerUsed, params.cliSessionId);
return {
...patch,
cliSessionIds: nextEntry.cliSessionIds,
claudeCliSessionId: nextEntry.claudeCliSessionId,
};
}
return patch;
},
});
} catch (err) {
logVerbose(`failed to persist ${label}usage update: ${String(err)}`);
}
return;
}
if (params.modelUsed || params.contextTokensUsed) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => {
const patch: Partial<SessionEntry> = {
modelProvider: params.providerUsed ?? entry.modelProvider,
model: params.modelUsed ?? entry.model,
contextTokens: params.contextTokensUsed ?? entry.contextTokens,
systemPromptReport: params.systemPromptReport ?? entry.systemPromptReport,
updatedAt: Date.now(),
};
if (params.cliSessionId) {
const nextEntry = { ...entry, ...patch };
setCliSessionId(nextEntry, params.providerUsed, params.cliSessionId);
return {
...patch,
cliSessionIds: nextEntry.cliSessionIds,
claudeCliSessionId: nextEntry.claudeCliSessionId,
};
}
return patch;
},
});
} catch (err) {
logVerbose(`failed to persist ${label}model/context update: ${String(err)}`);
}
}
}