Files
clawdbot/src/auto-reply/reply/agent-runner.ts
2026-01-18 05:35:35 +00:00

522 lines
18 KiB
TypeScript

import crypto from "node:crypto";
import fs from "node:fs";
import { setCliSessionId } from "../../agents/cli-session.js";
import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { resolveModelAuthMode } from "../../agents/model-auth.js";
import { isCliProvider } from "../../agents/model-selection.js";
import { queueEmbeddedPiMessage } from "../../agents/pi-embedded.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import {
resolveAgentIdFromSessionKey,
resolveSessionFilePath,
resolveSessionTranscriptPath,
type SessionEntry,
updateSessionStore,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
import { defaultRuntime } from "../../runtime.js";
import { resolveModelCostConfig } from "../../utils/usage-format.js";
import type { OriginatingChannelType, TemplateContext } from "../templating.js";
import type { VerboseLevel } from "../thinking.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { runAgentTurnWithFallback } from "./agent-runner-execution.js";
import {
createShouldEmitToolOutput,
createShouldEmitToolResult,
finalizeWithFollowup,
isAudioPayload,
signalTypingIfNeeded,
} from "./agent-runner-helpers.js";
import { runMemoryFlushIfNeeded } from "./agent-runner-memory.js";
import { buildReplyPayloads } from "./agent-runner-payloads.js";
import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.js";
import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js";
import { resolveBlockStreamingCoalescing } from "./block-streaming.js";
import { createFollowupRunner } from "./followup-runner.js";
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
import { incrementCompactionCount } from "./session-updates.js";
import type { TypingController } from "./typing.js";
import { createTypingSignaler } from "./typing-mode.js";
const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000;
export async function runReplyAgent(params: {
commandBody: string;
followupRun: FollowupRun;
queueKey: string;
resolvedQueue: QueueSettings;
shouldSteer: boolean;
shouldFollowup: boolean;
isActive: boolean;
isStreaming: boolean;
opts?: GetReplyOptions;
typing: TypingController;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
defaultModel: string;
agentCfgContextTokens?: number;
resolvedVerboseLevel: VerboseLevel;
isNewSession: boolean;
blockStreamingEnabled: boolean;
blockReplyChunking?: {
minChars: number;
maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence";
};
resolvedBlockStreamingBreak: "text_end" | "message_end";
sessionCtx: TemplateContext;
shouldInjectGroupIntro: boolean;
typingMode: TypingMode;
}): Promise<ReplyPayload | ReplyPayload[] | undefined> {
const {
commandBody,
followupRun,
queueKey,
resolvedQueue,
shouldSteer,
shouldFollowup,
isActive,
isStreaming,
opts,
typing,
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultModel,
agentCfgContextTokens,
resolvedVerboseLevel,
isNewSession,
blockStreamingEnabled,
blockReplyChunking,
resolvedBlockStreamingBreak,
sessionCtx,
shouldInjectGroupIntro,
typingMode,
} = params;
let activeSessionEntry = sessionEntry;
const activeSessionStore = sessionStore;
let activeIsNewSession = isNewSession;
const isHeartbeat = opts?.isHeartbeat === true;
const typingSignals = createTypingSignaler({
typing,
mode: typingMode,
isHeartbeat,
});
const shouldEmitToolResult = createShouldEmitToolResult({
sessionKey,
storePath,
resolvedVerboseLevel,
});
const shouldEmitToolOutput = createShouldEmitToolOutput({
sessionKey,
storePath,
resolvedVerboseLevel,
});
const pendingToolTasks = new Set<Promise<void>>();
const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
const replyToChannel =
sessionCtx.OriginatingChannel ??
((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as
| OriginatingChannelType
| undefined);
const replyToMode = resolveReplyToMode(
followupRun.run.config,
replyToChannel,
sessionCtx.AccountId,
);
const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel);
const cfg = followupRun.run.config;
const blockReplyCoalescing =
blockStreamingEnabled && opts?.onBlockReply
? resolveBlockStreamingCoalescing(
cfg,
sessionCtx.Provider,
sessionCtx.AccountId,
blockReplyChunking,
)
: undefined;
const blockReplyPipeline =
blockStreamingEnabled && opts?.onBlockReply
? createBlockReplyPipeline({
onBlockReply: opts.onBlockReply,
timeoutMs: blockReplyTimeoutMs,
coalescing: blockReplyCoalescing,
buffer: createAudioAsVoiceBuffer({ isAudioPayload }),
})
: null;
if (shouldSteer && isStreaming) {
const steered = queueEmbeddedPiMessage(followupRun.run.sessionId, followupRun.prompt);
if (steered && !shouldFollowup) {
if (activeSessionEntry && activeSessionStore && sessionKey) {
activeSessionEntry.updatedAt = Date.now();
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await updateSessionStore(storePath, (store) => {
store[sessionKey] = activeSessionEntry as SessionEntry;
});
}
}
typing.cleanup();
return undefined;
}
}
if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) {
enqueueFollowupRun(queueKey, followupRun, resolvedQueue);
if (activeSessionEntry && activeSessionStore && sessionKey) {
activeSessionEntry.updatedAt = Date.now();
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await updateSessionStore(storePath, (store) => {
store[sessionKey] = activeSessionEntry as SessionEntry;
});
}
}
typing.cleanup();
return undefined;
}
activeSessionEntry = await runMemoryFlushIfNeeded({
cfg,
followupRun,
sessionCtx,
opts,
defaultModel,
agentCfgContextTokens,
resolvedVerboseLevel,
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey,
storePath,
isHeartbeat,
});
const runFollowupTurn = createFollowupRunner({
opts,
typing,
typingMode,
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey,
storePath,
defaultModel,
agentCfgContextTokens,
});
let responseUsageLine: string | undefined;
type SessionResetOptions = {
failureLabel: string;
buildLogMessage: (nextSessionId: string) => string;
cleanupTranscripts?: boolean;
};
const resetSession = async ({
failureLabel,
buildLogMessage,
cleanupTranscripts,
}: SessionResetOptions): Promise<boolean> => {
if (!sessionKey || !activeSessionStore || !storePath) return false;
const prevEntry = activeSessionStore[sessionKey] ?? activeSessionEntry;
if (!prevEntry) return false;
const prevSessionId = cleanupTranscripts ? prevEntry.sessionId : undefined;
const nextSessionId = crypto.randomUUID();
const nextEntry: SessionEntry = {
...prevEntry,
sessionId: nextSessionId,
updatedAt: Date.now(),
systemSent: false,
abortedLastRun: false,
};
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const nextSessionFile = resolveSessionTranscriptPath(
nextSessionId,
agentId,
sessionCtx.MessageThreadId,
);
nextEntry.sessionFile = nextSessionFile;
activeSessionStore[sessionKey] = nextEntry;
try {
await updateSessionStore(storePath, (store) => {
store[sessionKey] = nextEntry;
});
} catch (err) {
defaultRuntime.error(
`Failed to persist session reset after ${failureLabel} (${sessionKey}): ${String(err)}`,
);
}
followupRun.run.sessionId = nextSessionId;
followupRun.run.sessionFile = nextSessionFile;
activeSessionEntry = nextEntry;
activeIsNewSession = true;
defaultRuntime.error(buildLogMessage(nextSessionId));
if (cleanupTranscripts && prevSessionId) {
const transcriptCandidates = new Set<string>();
const resolved = resolveSessionFilePath(prevSessionId, prevEntry, { agentId });
if (resolved) transcriptCandidates.add(resolved);
transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId));
for (const candidate of transcriptCandidates) {
try {
fs.unlinkSync(candidate);
} catch {
// Best-effort cleanup.
}
}
}
return true;
};
const resetSessionAfterCompactionFailure = async (reason: string): Promise<boolean> =>
resetSession({
failureLabel: "compaction failure",
buildLogMessage: (nextSessionId) =>
`Auto-compaction failed (${reason}). Restarting session ${sessionKey} -> ${nextSessionId} and retrying.`,
});
const resetSessionAfterRoleOrderingConflict = async (reason: string): Promise<boolean> =>
resetSession({
failureLabel: "role ordering conflict",
buildLogMessage: (nextSessionId) =>
`Role ordering conflict (${reason}). Restarting session ${sessionKey} -> ${nextSessionId}.`,
cleanupTranscripts: true,
});
try {
const runOutcome = await runAgentTurnWithFallback({
commandBody,
followupRun,
sessionCtx,
opts,
typingSignals,
blockReplyPipeline,
blockStreamingEnabled,
blockReplyChunking,
resolvedBlockStreamingBreak,
applyReplyToMode,
shouldEmitToolResult,
shouldEmitToolOutput,
pendingToolTasks,
resetSessionAfterCompactionFailure,
resetSessionAfterRoleOrderingConflict,
isHeartbeat,
sessionKey,
getActiveSessionEntry: () => activeSessionEntry,
activeSessionStore,
storePath,
resolvedVerboseLevel,
});
if (runOutcome.kind === "final") {
return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn);
}
const { runResult, fallbackProvider, fallbackModel, directlySentBlockKeys } = runOutcome;
let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome;
if (
shouldInjectGroupIntro &&
activeSessionEntry &&
activeSessionStore &&
sessionKey &&
activeSessionEntry.groupActivationNeedsSystemIntro
) {
activeSessionEntry.groupActivationNeedsSystemIntro = false;
activeSessionEntry.updatedAt = Date.now();
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await updateSessionStore(storePath, (store) => {
store[sessionKey] = activeSessionEntry as SessionEntry;
});
}
}
const payloadArray = runResult.payloads ?? [];
if (blockReplyPipeline) {
await blockReplyPipeline.flush({ force: true });
blockReplyPipeline.stop();
}
if (pendingToolTasks.size > 0) {
await Promise.allSettled(pendingToolTasks);
}
// Drain any late tool/block deliveries before deciding there's "nothing to send".
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
// keep the typing indicator stuck.
if (payloadArray.length === 0)
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
const payloadResult = buildReplyPayloads({
payloads: payloadArray,
isHeartbeat,
didLogHeartbeatStrip,
blockStreamingEnabled,
blockReplyPipeline,
directlySentBlockKeys,
replyToMode,
replyToChannel,
currentMessageId: sessionCtx.MessageSid,
messageProvider: followupRun.run.messageProvider,
messagingToolSentTexts: runResult.messagingToolSentTexts,
messagingToolSentTargets: runResult.messagingToolSentTargets,
originatingTo: sessionCtx.OriginatingTo ?? sessionCtx.To,
accountId: sessionCtx.AccountId,
});
const { replyPayloads } = payloadResult;
didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip;
if (replyPayloads.length === 0)
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
await signalTypingIfNeeded(replyPayloads, typingSignals);
const usage = runResult.meta.agentMeta?.usage;
const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const providerUsed =
runResult.meta.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider;
const cliSessionId = isCliProvider(providerUsed, cfg)
? runResult.meta.agentMeta?.sessionId?.trim()
: undefined;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
activeSessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
if (storePath && sessionKey) {
if (hasNonzeroUsage(usage)) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
const patch: Partial<SessionEntry> = {
inputTokens: input,
outputTokens: output,
totalTokens: promptTokens > 0 ? promptTokens : (usage.total ?? input),
modelProvider: providerUsed,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
systemPromptReport: runResult.meta.systemPromptReport ?? entry.systemPromptReport,
updatedAt: Date.now(),
};
if (cliSessionId) {
const nextEntry = { ...entry, ...patch };
setCliSessionId(nextEntry, providerUsed, cliSessionId);
return {
...patch,
cliSessionIds: nextEntry.cliSessionIds,
claudeCliSessionId: nextEntry.claudeCliSessionId,
};
}
return patch;
},
});
} catch (err) {
logVerbose(`failed to persist usage update: ${String(err)}`);
}
} else if (modelUsed || contextTokensUsed) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => {
const patch: Partial<SessionEntry> = {
modelProvider: providerUsed ?? entry.modelProvider,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
systemPromptReport: runResult.meta.systemPromptReport ?? entry.systemPromptReport,
updatedAt: Date.now(),
};
if (cliSessionId) {
const nextEntry = { ...entry, ...patch };
setCliSessionId(nextEntry, providerUsed, cliSessionId);
return {
...patch,
cliSessionIds: nextEntry.cliSessionIds,
claudeCliSessionId: nextEntry.claudeCliSessionId,
};
}
return patch;
},
});
} catch (err) {
logVerbose(`failed to persist model/context update: ${String(err)}`);
}
}
}
const responseUsageRaw =
activeSessionEntry?.responseUsage ??
(sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined);
const responseUsageMode =
responseUsageRaw === "full"
? "full"
: responseUsageRaw === "tokens" || responseUsageRaw === "on"
? "tokens"
: "off";
if (responseUsageMode !== "off" && hasNonzeroUsage(usage)) {
const authMode = resolveModelAuthMode(providerUsed, cfg);
const showCost = authMode === "api-key";
const costConfig = showCost
? resolveModelCostConfig({
provider: providerUsed,
model: modelUsed,
config: cfg,
})
: undefined;
let formatted = formatResponseUsageLine({
usage,
showCost,
costConfig,
});
if (formatted && responseUsageMode === "full" && sessionKey) {
formatted = `${formatted} · session ${sessionKey}`;
}
if (formatted) responseUsageLine = formatted;
}
// If verbose is enabled and this is a new session, prepend a session hint.
let finalPayloads = replyPayloads;
const verboseEnabled = resolvedVerboseLevel !== "off";
if (autoCompactionCompleted) {
const count = await incrementCompactionCount({
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey,
storePath,
});
if (verboseEnabled) {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
finalPayloads = [{ text: `🧹 Auto-compaction complete${suffix}.` }, ...finalPayloads];
}
}
if (verboseEnabled && activeIsNewSession) {
finalPayloads = [{ text: `🧭 New session: ${followupRun.run.sessionId}` }, ...finalPayloads];
}
if (responseUsageLine) {
finalPayloads = appendUsageLine(finalPayloads, responseUsageLine);
}
return finalizeWithFollowup(
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
queueKey,
runFollowupTurn,
);
} finally {
blockReplyPipeline?.stop();
typing.markRunComplete();
}
}