import crypto from "node:crypto"; import { lookupContextTokens } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import { hasNonzeroUsage } from "../../agents/usage.js"; import { type SessionEntry, saveSessionStore } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { defaultRuntime } from "../../runtime.js"; import { stripHeartbeatToken } from "../heartbeat.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { FollowupRun } from "./queue.js"; import { extractReplyToTag } from "./reply-tags.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; import { incrementCompactionCount } from "./session-updates.js"; import type { TypingController } from "./typing.js"; export function createFollowupRunner(params: { opts?: GetReplyOptions; typing: TypingController; typingMode: TypingMode; sessionEntry?: SessionEntry; sessionStore?: Record; sessionKey?: string; storePath?: string; defaultModel: string; agentCfgContextTokens?: number; }): (queued: FollowupRun) => Promise { const { opts, typing, typingMode, sessionEntry, sessionStore, sessionKey, storePath, defaultModel, agentCfgContextTokens, } = params; /** * Sends followup payloads, routing to the originating channel if set. * * When originatingChannel/originatingTo are set on the queued run, * replies are routed directly to that provider instead of using the * session's current dispatcher. This ensures replies go back to * where the message originated. */ const sendFollowupPayloads = async ( payloads: ReplyPayload[], queued: FollowupRun, ) => { // Check if we should route to originating channel. const { originatingChannel, originatingTo } = queued; const shouldRouteToOriginating = isRoutableChannel(originatingChannel) && originatingTo; if (!shouldRouteToOriginating && !opts?.onBlockReply) { logVerbose("followup queue: no onBlockReply handler; dropping payloads"); return; } for (const payload of payloads) { if (!payload?.text && !payload?.mediaUrl && !payload?.mediaUrls?.length) { continue; } if ( payload.text?.trim() === SILENT_REPLY_TOKEN && !payload.mediaUrl && !payload.mediaUrls?.length ) { continue; } if (typingMode !== "never") { if (typingMode === "message" || typingMode === "instant") { await typing.startTypingOnText(payload.text); } else if (typingMode === "thinking") { typing.refreshTypingTtl(); } } // Route to originating channel if set, otherwise fall back to dispatcher. if (shouldRouteToOriginating) { const result = await routeReply({ payload, channel: originatingChannel, to: originatingTo, accountId: queued.originatingAccountId, threadId: queued.originatingThreadId, cfg: queued.run.config, }); if (!result.ok) { // Log error and fall back to dispatcher if available. const errorMsg = result.error ?? "unknown error"; logVerbose(`followup queue: route-reply failed: ${errorMsg}`); // Fallback: try the dispatcher if routing failed. if (opts?.onBlockReply) { await opts.onBlockReply(payload); } } } else if (opts?.onBlockReply) { await opts.onBlockReply(payload); } } }; return async (queued: FollowupRun) => { try { const runId = crypto.randomUUID(); if (queued.run.sessionKey) { registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey }); } let autoCompactionCompleted = false; let runResult: Awaited>; let fallbackProvider = queued.run.provider; let fallbackModel = queued.run.model; try { const fallbackResult = await runWithModelFallback({ cfg: queued.run.config, provider: queued.run.provider, model: queued.run.model, run: (provider, model) => runEmbeddedPiAgent({ sessionId: queued.run.sessionId, sessionKey: queued.run.sessionKey, messageProvider: queued.run.messageProvider, sessionFile: queued.run.sessionFile, workspaceDir: queued.run.workspaceDir, config: queued.run.config, skillsSnapshot: queued.run.skillsSnapshot, prompt: queued.prompt, extraSystemPrompt: queued.run.extraSystemPrompt, ownerNumbers: queued.run.ownerNumbers, enforceFinalTag: queued.run.enforceFinalTag, provider, model, authProfileId: queued.run.authProfileId, thinkLevel: queued.run.thinkLevel, verboseLevel: queued.run.verboseLevel, reasoningLevel: queued.run.reasoningLevel, bashElevated: queued.run.bashElevated, timeoutMs: queued.run.timeoutMs, runId, blockReplyBreak: queued.run.blockReplyBreak, onAgentEvent: (evt) => { if (evt.stream !== "compaction") return; const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; const willRetry = Boolean(evt.data.willRetry); if (phase === "end" && !willRetry) { autoCompactionCompleted = true; } }, }), }); runResult = fallbackResult.result; fallbackProvider = fallbackResult.provider; fallbackModel = fallbackResult.model; } catch (err) { const message = err instanceof Error ? err.message : String(err); defaultRuntime.error?.( `Followup agent failed before reply: ${message}`, ); return; } const payloadArray = runResult.payloads ?? []; if (payloadArray.length === 0) return; const sanitizedPayloads = payloadArray.flatMap((payload) => { const text = payload.text; if (!text || !text.includes("HEARTBEAT_OK")) return [payload]; const stripped = stripHeartbeatToken(text, { mode: "message" }); const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; if (stripped.shouldSkip && !hasMedia) return []; return [{ ...payload, text: stripped.text }]; }); const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads .map((payload) => { const { cleaned, replyToId } = extractReplyToTag(payload.text); return { ...payload, text: cleaned ? cleaned : undefined, replyToId: replyToId ?? payload.replyToId, }; }) .filter( (payload) => payload.text || payload.mediaUrl || (payload.mediaUrls && payload.mediaUrls.length > 0), ); if (replyTaggedPayloads.length === 0) return; if (autoCompactionCompleted) { const count = await incrementCompactionCount({ sessionEntry, sessionStore, sessionKey, storePath, }); if (queued.run.verboseLevel === "on") { const suffix = typeof count === "number" ? ` (count ${count})` : ""; replyTaggedPayloads.unshift({ text: `🧹 Auto-compaction complete${suffix}.`, }); } } if (sessionStore && sessionKey) { const usage = runResult.meta.agentMeta?.usage; const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; const contextTokensUsed = agentCfgContextTokens ?? lookupContextTokens(modelUsed) ?? sessionEntry?.contextTokens ?? DEFAULT_CONTEXT_TOKENS; if (hasNonzeroUsage(usage)) { const entry = sessionStore[sessionKey]; if (entry) { const input = usage.input ?? 0; const output = usage.output ?? 0; const promptTokens = input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); sessionStore[sessionKey] = { ...entry, inputTokens: input, outputTokens: output, totalTokens: promptTokens > 0 ? promptTokens : (usage.total ?? input), modelProvider: fallbackProvider ?? entry.modelProvider, model: modelUsed, contextTokens: contextTokensUsed ?? entry.contextTokens, updatedAt: Date.now(), }; if (storePath) { await saveSessionStore(storePath, sessionStore); } } } else if (modelUsed || contextTokensUsed) { const entry = sessionStore[sessionKey]; if (entry) { sessionStore[sessionKey] = { ...entry, modelProvider: fallbackProvider ?? entry.modelProvider, model: modelUsed ?? entry.model, contextTokens: contextTokensUsed ?? entry.contextTokens, }; if (storePath) { await saveSessionStore(storePath, sessionStore); } } } } await sendFollowupPayloads(replyTaggedPayloads, queued); } finally { typing.markRunComplete(); } }; }