From 198f8ea700c4c94fbb8a3e8746d18b1a7dc94c5d Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 25 Dec 2025 23:50:52 +0100 Subject: [PATCH] fix(agent): serialize runs per session --- docs/configuration.md | 5 + docs/queue.md | 10 +- src/agents/pi-embedded-runner.ts | 523 ++++++++++++++++--------------- src/auto-reply/reply.ts | 1 + src/commands/agent.ts | 1 + src/cron/isolated-agent.ts | 3 +- 6 files changed, 288 insertions(+), 255 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index ae2075f4e..3325e0d9c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -131,6 +131,7 @@ Controls the embedded agent runtime (provider/model/thinking/verbose/timeouts). timeoutSeconds: 600, mediaMaxMb: 5, heartbeatMinutes: 30, + maxConcurrent: 3, bash: { backgroundMs: 20000, timeoutSec: 1800, @@ -146,6 +147,10 @@ Controls the embedded agent runtime (provider/model/thinking/verbose/timeouts). - `timeoutSec`: auto-kill after this runtime (seconds, default 1800) - `cleanupMs`: how long to keep finished sessions in memory (ms, default 1800000) +`agent.maxConcurrent` sets the maximum number of embedded agent runs that can +execute in parallel across sessions. Each session is still serialized (one run +per session key at a time). Default: 1. + ### `models` (custom providers + base URLs) Clawdis uses the **pi-coding-agent** model catalog. You can add custom providers diff --git a/docs/queue.md b/docs/queue.md index 56874cf2b..ef78d7884 100644 --- a/docs/queue.md +++ b/docs/queue.md @@ -5,22 +5,24 @@ read_when: --- # Command Queue (2025-11-25) -We now serialize all command-based auto-replies (WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once. +We now serialize command-based auto-replies (WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once, while allowing safe parallelism across sessions. ## Why - Some auto-reply commands are expensive (LLM calls) and can collide when multiple inbound messages arrive close together. - Serializing avoids competing for terminal/stdin, keeps logs readable, and reduces the chance of rate limits from upstream tools. ## How it works -- `src/process/command-queue.ts` holds a single FIFO queue and drains it synchronously; only one task runs at a time. -- `getReplyFromConfig` wraps command execution with `enqueueCommand(...)`, so every config-driven command reply flows through the queue automatically. +- `src/process/command-queue.ts` holds a lane-aware FIFO queue and drains each lane synchronously. +- `runEmbeddedPiAgent` enqueues by **session key** (lane `session:`) to guarantee only one active run per session. +- Each session run is then queued into a **global lane** (`main` by default) so overall parallelism is capped by `agent.maxConcurrent`. - When verbose logging is enabled, queued commands emit a short notice if they waited more than ~2s before starting. - Typing indicators (`onReplyStart`) still fire immediately on enqueue so user experience is unchanged while we wait our turn. ## Scope and guarantees - Applies only to config-driven command replies; plain text replies are unaffected. -- Default lane (`main`) is process-wide for inbound + main heartbeats to keep the primary workflow serialized. +- Default lane (`main`) is process-wide for inbound + main heartbeats; set `agent.maxConcurrent` to allow multiple sessions in parallel. - Additional lanes may exist (e.g. `cron`) so background jobs can run in parallel without blocking inbound replies. +- Per-session lanes guarantee that only one agent run touches a given session at a time. - No external dependencies or background worker threads; pure TypeScript + promises. ## Troubleshooting diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 3caa05214..6314b2dc2 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -25,7 +25,10 @@ import { formatToolAggregate } from "../auto-reply/tool-meta.js"; import type { ClawdisConfig } from "../config/config.js"; import { getMachineDisplayName } from "../infra/machine-name.js"; import { splitMediaFromOutput } from "../media/parse.js"; -import { enqueueCommand } from "../process/command-queue.js"; +import { + enqueueCommand, + enqueueCommandInLane, +} from "../process/command-queue.js"; import { CONFIG_DIR, resolveUserPath } from "../utils.js"; import { resolveClawdisAgentDir } from "./agent-paths.js"; import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "./defaults.js"; @@ -90,6 +93,16 @@ const DEFAULT_OAUTH_DIR = path.join(CONFIG_DIR, "credentials"); let oauthStorageConfigured = false; let cachedDefaultApiKey: ReturnType | null = null; +function resolveSessionLane(key: string) { + const cleaned = key.trim() || "main"; + return cleaned.startsWith("session:") ? cleaned : `session:${cleaned}`; +} + +function resolveGlobalLane(lane?: string) { + const cleaned = lane?.trim(); + return cleaned ? cleaned : "main"; +} + function resolveClawdisOAuthPath(): string { const overrideDir = process.env.CLAWDIS_OAUTH_DIR?.trim() || DEFAULT_OAUTH_DIR; @@ -242,6 +255,7 @@ function resolvePromptSkills( export async function runEmbeddedPiAgent(params: { sessionId: string; + sessionKey?: string; sessionFile: string; workspaceDir: string; config?: ClawdisConfig; @@ -267,268 +281,277 @@ export async function runEmbeddedPiAgent(params: { stream: string; data: Record; }) => void; + lane?: string; enqueue?: typeof enqueueCommand; extraSystemPrompt?: string; ownerNumbers?: string[]; enforceFinalTag?: boolean; }): Promise { - const enqueue = params.enqueue ?? enqueueCommand; - return enqueue(async () => { - const started = Date.now(); - const resolvedWorkspace = resolveUserPath(params.workspaceDir); - const prevCwd = process.cwd(); + const sessionLane = resolveSessionLane( + params.sessionKey?.trim() || params.sessionId, + ); + const globalLane = resolveGlobalLane(params.lane); + const enqueueGlobal = + params.enqueue ?? + ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); + return enqueueCommandInLane(sessionLane, () => + enqueueGlobal(async () => { + const started = Date.now(); + const resolvedWorkspace = resolveUserPath(params.workspaceDir); + const prevCwd = process.cwd(); - const provider = - (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; - const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; - await ensureClawdisModelsJson(params.config); - const agentDir = resolveClawdisAgentDir(); - const { model, error } = resolveModel(provider, modelId, agentDir); - if (!model) { - throw new Error(error ?? `Unknown model: ${provider}/${modelId}`); - } - - const thinkingLevel = mapThinkingLevel(params.thinkLevel); - - await fs.mkdir(resolvedWorkspace, { recursive: true }); - await ensureSessionHeader({ - sessionFile: params.sessionFile, - sessionId: params.sessionId, - cwd: resolvedWorkspace, - }); - - let restoreSkillEnv: (() => void) | undefined; - process.chdir(resolvedWorkspace); - try { - const shouldLoadSkillEntries = - !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; - const skillEntries = shouldLoadSkillEntries - ? loadWorkspaceSkillEntries(resolvedWorkspace) - : []; - const skillsSnapshot = - params.skillsSnapshot ?? - buildWorkspaceSkillSnapshot(resolvedWorkspace, { - config: params.config, - entries: skillEntries, - }); - restoreSkillEnv = params.skillsSnapshot - ? applySkillEnvOverridesFromSnapshot({ - snapshot: params.skillsSnapshot, - config: params.config, - }) - : applySkillEnvOverrides({ - skills: skillEntries ?? [], - config: params.config, - }); - - const bootstrapFiles = - await loadWorkspaceBootstrapFiles(resolvedWorkspace); - const contextFiles = buildBootstrapContextFiles(bootstrapFiles); - const promptSkills = resolvePromptSkills(skillsSnapshot, skillEntries); - const tools = createClawdisCodingTools({ - bash: params.config?.agent?.bash, - }); - const machineName = await getMachineDisplayName(); - const runtimeInfo = { - host: machineName, - os: `${os.type()} ${os.release()}`, - arch: os.arch(), - node: process.version, - model: `${provider}/${modelId}`, - }; - const reasoningTagHint = provider === "lmstudio" || provider === "ollama"; - const systemPrompt = buildSystemPrompt({ - appendPrompt: buildAgentSystemPromptAppend({ - workspaceDir: resolvedWorkspace, - defaultThinkLevel: params.thinkLevel, - extraSystemPrompt: params.extraSystemPrompt, - ownerNumbers: params.ownerNumbers, - reasoningTagHint, - runtimeInfo, - }), - contextFiles, - skills: promptSkills, - cwd: resolvedWorkspace, - tools, - }); - - const sessionManager = SessionManager.open(params.sessionFile, agentDir); - const settingsManager = SettingsManager.create( - resolvedWorkspace, - agentDir, - ); - - const { session } = await createAgentSession({ - cwd: resolvedWorkspace, - agentDir, - model, - thinkingLevel, - systemPrompt, - // TODO(steipete): Once pi-mono publishes file-magic MIME detection in `read` image payloads, - // remove `createClawdisCodingTools()` and use upstream `codingTools` again. - tools, - sessionManager, - settingsManager, - getApiKey: async (m) => { - return await getApiKeyForModel(m as Model); - }, - skills: promptSkills, - contextFiles, - }); - - const prior = await sanitizeSessionMessagesImages( - session.messages, - "session:history", - ); - if (prior.length > 0) { - session.agent.replaceMessages(prior); + const provider = + (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; + const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; + await ensureClawdisModelsJson(params.config); + const agentDir = resolveClawdisAgentDir(); + const { model, error } = resolveModel(provider, modelId, agentDir); + if (!model) { + throw new Error(error ?? `Unknown model: ${provider}/${modelId}`); } - const queueHandle: EmbeddedPiQueueHandle = { - queueMessage: async (text: string) => { - await session.queueMessage(text); - }, - isStreaming: () => session.isStreaming, - }; - ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); - let aborted = Boolean(params.abortSignal?.aborted); - const { - assistantTexts, - toolMetas, - unsubscribe, - flush: flushToolDebouncer, - } = subscribeEmbeddedPiSession({ - session, - runId: params.runId, - verboseLevel: params.verboseLevel, - shouldEmitToolResult: params.shouldEmitToolResult, - onToolResult: params.onToolResult, - onPartialReply: params.onPartialReply, - onAgentEvent: params.onAgentEvent, - enforceFinalTag: params.enforceFinalTag, + const thinkingLevel = mapThinkingLevel(params.thinkLevel); + + await fs.mkdir(resolvedWorkspace, { recursive: true }); + await ensureSessionHeader({ + sessionFile: params.sessionFile, + sessionId: params.sessionId, + cwd: resolvedWorkspace, }); - const abortTimer = setTimeout( - () => { - aborted = true; - void session.abort(); - }, - Math.max(1, params.timeoutMs), - ); - - let messagesSnapshot: AppMessage[] = []; - let sessionIdUsed = session.sessionId; - const onAbort = () => { - aborted = true; - void session.abort(); - }; - if (params.abortSignal) { - if (params.abortSignal.aborted) { - onAbort(); - } else { - params.abortSignal.addEventListener("abort", onAbort, { once: true }); - } - } - let promptError: unknown = null; + let restoreSkillEnv: (() => void) | undefined; + process.chdir(resolvedWorkspace); try { - try { - await session.prompt(params.prompt); - } catch (err) { - promptError = err; - } finally { - messagesSnapshot = session.messages.slice(); - sessionIdUsed = session.sessionId; - } - } finally { - clearTimeout(abortTimer); - unsubscribe(); - flushToolDebouncer(); - if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { - ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); - } - session.dispose(); - params.abortSignal?.removeEventListener?.("abort", onAbort); - } - if (promptError && !aborted) { - throw promptError; - } + const shouldLoadSkillEntries = + !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; + const skillEntries = shouldLoadSkillEntries + ? loadWorkspaceSkillEntries(resolvedWorkspace) + : []; + const skillsSnapshot = + params.skillsSnapshot ?? + buildWorkspaceSkillSnapshot(resolvedWorkspace, { + config: params.config, + entries: skillEntries, + }); + restoreSkillEnv = params.skillsSnapshot + ? applySkillEnvOverridesFromSnapshot({ + snapshot: params.skillsSnapshot, + config: params.config, + }) + : applySkillEnvOverrides({ + skills: skillEntries ?? [], + config: params.config, + }); - const lastAssistant = messagesSnapshot - .slice() - .reverse() - .find((m) => (m as AppMessage)?.role === "assistant") as - | AssistantMessage - | undefined; + const bootstrapFiles = + await loadWorkspaceBootstrapFiles(resolvedWorkspace); + const contextFiles = buildBootstrapContextFiles(bootstrapFiles); + const promptSkills = resolvePromptSkills(skillsSnapshot, skillEntries); + const tools = createClawdisCodingTools({ + bash: params.config?.agent?.bash, + }); + const machineName = await getMachineDisplayName(); + const runtimeInfo = { + host: machineName, + os: `${os.type()} ${os.release()}`, + arch: os.arch(), + node: process.version, + model: `${provider}/${modelId}`, + }; + const reasoningTagHint = provider === "lmstudio" || provider === "ollama"; + const systemPrompt = buildSystemPrompt({ + appendPrompt: buildAgentSystemPromptAppend({ + workspaceDir: resolvedWorkspace, + defaultThinkLevel: params.thinkLevel, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + reasoningTagHint, + runtimeInfo, + }), + contextFiles, + skills: promptSkills, + cwd: resolvedWorkspace, + tools, + }); - const usage = lastAssistant?.usage; - const agentMeta: EmbeddedPiAgentMeta = { - sessionId: sessionIdUsed, - provider: lastAssistant?.provider ?? provider, - model: lastAssistant?.model ?? model.id, - usage: usage - ? { - input: usage.input, - output: usage.output, - cacheRead: usage.cacheRead, - cacheWrite: usage.cacheWrite, - total: usage.totalTokens, - } - : undefined, - }; - - const replyItems: Array<{ text: string; media?: string[] }> = []; - - const errorText = lastAssistant - ? formatAssistantErrorText(lastAssistant) - : undefined; - if (errorText) replyItems.push({ text: errorText }); - - const inlineToolResults = - params.verboseLevel === "on" && - !params.onPartialReply && - !params.onToolResult && - toolMetas.length > 0; - if (inlineToolResults) { - for (const { toolName, meta } of toolMetas) { - const agg = formatToolAggregate(toolName, meta ? [meta] : []); - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg); - if (cleanedText) - replyItems.push({ text: cleanedText, media: mediaUrls }); - } - } - - for (const text of assistantTexts.length - ? assistantTexts - : lastAssistant - ? [extractAssistantText(lastAssistant)] - : []) { - const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); - if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue; - replyItems.push({ text: cleanedText, media: mediaUrls }); - } - - const payloads = replyItems - .map((item) => ({ - text: item.text?.trim() ? item.text.trim() : undefined, - mediaUrls: item.media?.length ? item.media : undefined, - mediaUrl: item.media?.[0], - })) - .filter( - (p) => - p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0), + const sessionManager = SessionManager.open(params.sessionFile, agentDir); + const settingsManager = SettingsManager.create( + resolvedWorkspace, + agentDir, ); - return { - payloads: payloads.length ? payloads : undefined, - meta: { - durationMs: Date.now() - started, - agentMeta, - aborted, - }, - }; - } finally { - restoreSkillEnv?.(); - process.chdir(prevCwd); - } - }); + const { session } = await createAgentSession({ + cwd: resolvedWorkspace, + agentDir, + model, + thinkingLevel, + systemPrompt, + // TODO(steipete): Once pi-mono publishes file-magic MIME detection in `read` image payloads, + // remove `createClawdisCodingTools()` and use upstream `codingTools` again. + tools, + sessionManager, + settingsManager, + getApiKey: async (m) => { + return await getApiKeyForModel(m as Model); + }, + skills: promptSkills, + contextFiles, + }); + + const prior = await sanitizeSessionMessagesImages( + session.messages, + "session:history", + ); + if (prior.length > 0) { + session.agent.replaceMessages(prior); + } + const queueHandle: EmbeddedPiQueueHandle = { + queueMessage: async (text: string) => { + await session.queueMessage(text); + }, + isStreaming: () => session.isStreaming, + }; + ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); + let aborted = Boolean(params.abortSignal?.aborted); + + const { + assistantTexts, + toolMetas, + unsubscribe, + flush: flushToolDebouncer, + } = subscribeEmbeddedPiSession({ + session, + runId: params.runId, + verboseLevel: params.verboseLevel, + shouldEmitToolResult: params.shouldEmitToolResult, + onToolResult: params.onToolResult, + onPartialReply: params.onPartialReply, + onAgentEvent: params.onAgentEvent, + enforceFinalTag: params.enforceFinalTag, + }); + + const abortTimer = setTimeout( + () => { + aborted = true; + void session.abort(); + }, + Math.max(1, params.timeoutMs), + ); + + let messagesSnapshot: AppMessage[] = []; + let sessionIdUsed = session.sessionId; + const onAbort = () => { + aborted = true; + void session.abort(); + }; + if (params.abortSignal) { + if (params.abortSignal.aborted) { + onAbort(); + } else { + params.abortSignal.addEventListener("abort", onAbort, { once: true }); + } + } + let promptError: unknown = null; + try { + try { + await session.prompt(params.prompt); + } catch (err) { + promptError = err; + } finally { + messagesSnapshot = session.messages.slice(); + sessionIdUsed = session.sessionId; + } + } finally { + clearTimeout(abortTimer); + unsubscribe(); + flushToolDebouncer(); + if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { + ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); + } + session.dispose(); + params.abortSignal?.removeEventListener?.("abort", onAbort); + } + if (promptError && !aborted) { + throw promptError; + } + + const lastAssistant = messagesSnapshot + .slice() + .reverse() + .find((m) => (m as AppMessage)?.role === "assistant") as + | AssistantMessage + | undefined; + + const usage = lastAssistant?.usage; + const agentMeta: EmbeddedPiAgentMeta = { + sessionId: sessionIdUsed, + provider: lastAssistant?.provider ?? provider, + model: lastAssistant?.model ?? model.id, + usage: usage + ? { + input: usage.input, + output: usage.output, + cacheRead: usage.cacheRead, + cacheWrite: usage.cacheWrite, + total: usage.totalTokens, + } + : undefined, + }; + + const replyItems: Array<{ text: string; media?: string[] }> = []; + + const errorText = lastAssistant + ? formatAssistantErrorText(lastAssistant) + : undefined; + if (errorText) replyItems.push({ text: errorText }); + + const inlineToolResults = + params.verboseLevel === "on" && + !params.onPartialReply && + !params.onToolResult && + toolMetas.length > 0; + if (inlineToolResults) { + for (const { toolName, meta } of toolMetas) { + const agg = formatToolAggregate(toolName, meta ? [meta] : []); + const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg); + if (cleanedText) + replyItems.push({ text: cleanedText, media: mediaUrls }); + } + } + + for (const text of assistantTexts.length + ? assistantTexts + : lastAssistant + ? [extractAssistantText(lastAssistant)] + : []) { + const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); + if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue; + replyItems.push({ text: cleanedText, media: mediaUrls }); + } + + const payloads = replyItems + .map((item) => ({ + text: item.text?.trim() ? item.text.trim() : undefined, + mediaUrls: item.media?.length ? item.media : undefined, + mediaUrl: item.media?.[0], + })) + .filter( + (p) => + p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0), + ); + + return { + payloads: payloads.length ? payloads : undefined, + meta: { + durationMs: Date.now() - started, + agentMeta, + aborted, + }, + }; + } finally { + restoreSkillEnv?.(); + process.chdir(prevCwd); + } + }), + ); } diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index d170f011b..00ac4a460 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -983,6 +983,7 @@ export async function getReplyFromConfig( const runId = crypto.randomUUID(); const runResult = await runEmbeddedPiAgent({ sessionId: sessionIdFinal, + sessionKey, sessionFile, workspaceDir, config: cfg, diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 739d72a38..78edd5e5b 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -313,6 +313,7 @@ export async function agentCommand( try { result = await runEmbeddedPiAgent({ sessionId, + sessionKey, sessionFile, workspaceDir, config: cfg, diff --git a/src/cron/isolated-agent.ts b/src/cron/isolated-agent.ts index 10cb3c237..767e3113d 100644 --- a/src/cron/isolated-agent.ts +++ b/src/cron/isolated-agent.ts @@ -146,7 +146,6 @@ export async function runCronIsolatedAgentTurn(params: { lane?: string; }): Promise { const agentCfg = params.cfg.agent; - void params.lane; const workspaceDirRaw = params.cfg.agent?.workspace ?? DEFAULT_AGENT_WORKSPACE_DIR; const workspace = await ensureAgentWorkspace({ @@ -236,11 +235,13 @@ export async function runCronIsolatedAgentTurn(params: { ); runResult = await runEmbeddedPiAgent({ sessionId: cronSession.sessionEntry.sessionId, + sessionKey: params.sessionKey, sessionFile, workspaceDir, config: params.cfg, skillsSnapshot, prompt: commandBody, + lane: params.lane ?? "cron", provider, model, thinkLevel,