fix(agent): serialize runs per session

This commit is contained in:
Peter Steinberger
2025-12-25 23:50:52 +01:00
parent 9fa9199747
commit 198f8ea700
6 changed files with 288 additions and 255 deletions

View File

@@ -131,6 +131,7 @@ Controls the embedded agent runtime (provider/model/thinking/verbose/timeouts).
timeoutSeconds: 600,
mediaMaxMb: 5,
heartbeatMinutes: 30,
maxConcurrent: 3,
bash: {
backgroundMs: 20000,
timeoutSec: 1800,
@@ -146,6 +147,10 @@ Controls the embedded agent runtime (provider/model/thinking/verbose/timeouts).
- `timeoutSec`: auto-kill after this runtime (seconds, default 1800)
- `cleanupMs`: how long to keep finished sessions in memory (ms, default 1800000)
`agent.maxConcurrent` sets the maximum number of embedded agent runs that can
execute in parallel across sessions. Each session is still serialized (one run
per session key at a time). Default: 1.
### `models` (custom providers + base URLs)
Clawdis uses the **pi-coding-agent** model catalog. You can add custom providers

View File

@@ -5,22 +5,24 @@ read_when:
---
# Command Queue (2025-11-25)
We now serialize all command-based auto-replies (WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once.
We now serialize command-based auto-replies (WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once, while allowing safe parallelism across sessions.
## Why
- Some auto-reply commands are expensive (LLM calls) and can collide when multiple inbound messages arrive close together.
- Serializing avoids competing for terminal/stdin, keeps logs readable, and reduces the chance of rate limits from upstream tools.
## How it works
- `src/process/command-queue.ts` holds a single FIFO queue and drains it synchronously; only one task runs at a time.
- `getReplyFromConfig` wraps command execution with `enqueueCommand(...)`, so every config-driven command reply flows through the queue automatically.
- `src/process/command-queue.ts` holds a lane-aware FIFO queue and drains each lane synchronously.
- `runEmbeddedPiAgent` enqueues by **session key** (lane `session:<key>`) to guarantee only one active run per session.
- Each session run is then queued into a **global lane** (`main` by default) so overall parallelism is capped by `agent.maxConcurrent`.
- When verbose logging is enabled, queued commands emit a short notice if they waited more than ~2s before starting.
- Typing indicators (`onReplyStart`) still fire immediately on enqueue so user experience is unchanged while we wait our turn.
## Scope and guarantees
- Applies only to config-driven command replies; plain text replies are unaffected.
- Default lane (`main`) is process-wide for inbound + main heartbeats to keep the primary workflow serialized.
- Default lane (`main`) is process-wide for inbound + main heartbeats; set `agent.maxConcurrent` to allow multiple sessions in parallel.
- Additional lanes may exist (e.g. `cron`) so background jobs can run in parallel without blocking inbound replies.
- Per-session lanes guarantee that only one agent run touches a given session at a time.
- No external dependencies or background worker threads; pure TypeScript + promises.
## Troubleshooting

View File

@@ -25,7 +25,10 @@ import { formatToolAggregate } from "../auto-reply/tool-meta.js";
import type { ClawdisConfig } from "../config/config.js";
import { getMachineDisplayName } from "../infra/machine-name.js";
import { splitMediaFromOutput } from "../media/parse.js";
import { enqueueCommand } from "../process/command-queue.js";
import {
enqueueCommand,
enqueueCommandInLane,
} from "../process/command-queue.js";
import { CONFIG_DIR, resolveUserPath } from "../utils.js";
import { resolveClawdisAgentDir } from "./agent-paths.js";
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "./defaults.js";
@@ -90,6 +93,16 @@ const DEFAULT_OAUTH_DIR = path.join(CONFIG_DIR, "credentials");
let oauthStorageConfigured = false;
let cachedDefaultApiKey: ReturnType<typeof defaultGetApiKey> | null = null;
function resolveSessionLane(key: string) {
const cleaned = key.trim() || "main";
return cleaned.startsWith("session:") ? cleaned : `session:${cleaned}`;
}
function resolveGlobalLane(lane?: string) {
const cleaned = lane?.trim();
return cleaned ? cleaned : "main";
}
function resolveClawdisOAuthPath(): string {
const overrideDir =
process.env.CLAWDIS_OAUTH_DIR?.trim() || DEFAULT_OAUTH_DIR;
@@ -242,6 +255,7 @@ function resolvePromptSkills(
export async function runEmbeddedPiAgent(params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
workspaceDir: string;
config?: ClawdisConfig;
@@ -267,268 +281,277 @@ export async function runEmbeddedPiAgent(params: {
stream: string;
data: Record<string, unknown>;
}) => void;
lane?: string;
enqueue?: typeof enqueueCommand;
extraSystemPrompt?: string;
ownerNumbers?: string[];
enforceFinalTag?: boolean;
}): Promise<EmbeddedPiRunResult> {
const enqueue = params.enqueue ?? enqueueCommand;
return enqueue(async () => {
const started = Date.now();
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd();
const sessionLane = resolveSessionLane(
params.sessionKey?.trim() || params.sessionId,
);
const globalLane = resolveGlobalLane(params.lane);
const enqueueGlobal =
params.enqueue ??
((task, opts) => enqueueCommandInLane(globalLane, task, opts));
return enqueueCommandInLane(sessionLane, () =>
enqueueGlobal(async () => {
const started = Date.now();
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd();
const provider =
(params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER;
const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL;
await ensureClawdisModelsJson(params.config);
const agentDir = resolveClawdisAgentDir();
const { model, error } = resolveModel(provider, modelId, agentDir);
if (!model) {
throw new Error(error ?? `Unknown model: ${provider}/${modelId}`);
}
const thinkingLevel = mapThinkingLevel(params.thinkLevel);
await fs.mkdir(resolvedWorkspace, { recursive: true });
await ensureSessionHeader({
sessionFile: params.sessionFile,
sessionId: params.sessionId,
cwd: resolvedWorkspace,
});
let restoreSkillEnv: (() => void) | undefined;
process.chdir(resolvedWorkspace);
try {
const shouldLoadSkillEntries =
!params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
const skillEntries = shouldLoadSkillEntries
? loadWorkspaceSkillEntries(resolvedWorkspace)
: [];
const skillsSnapshot =
params.skillsSnapshot ??
buildWorkspaceSkillSnapshot(resolvedWorkspace, {
config: params.config,
entries: skillEntries,
});
restoreSkillEnv = params.skillsSnapshot
? applySkillEnvOverridesFromSnapshot({
snapshot: params.skillsSnapshot,
config: params.config,
})
: applySkillEnvOverrides({
skills: skillEntries ?? [],
config: params.config,
});
const bootstrapFiles =
await loadWorkspaceBootstrapFiles(resolvedWorkspace);
const contextFiles = buildBootstrapContextFiles(bootstrapFiles);
const promptSkills = resolvePromptSkills(skillsSnapshot, skillEntries);
const tools = createClawdisCodingTools({
bash: params.config?.agent?.bash,
});
const machineName = await getMachineDisplayName();
const runtimeInfo = {
host: machineName,
os: `${os.type()} ${os.release()}`,
arch: os.arch(),
node: process.version,
model: `${provider}/${modelId}`,
};
const reasoningTagHint = provider === "lmstudio" || provider === "ollama";
const systemPrompt = buildSystemPrompt({
appendPrompt: buildAgentSystemPromptAppend({
workspaceDir: resolvedWorkspace,
defaultThinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
reasoningTagHint,
runtimeInfo,
}),
contextFiles,
skills: promptSkills,
cwd: resolvedWorkspace,
tools,
});
const sessionManager = SessionManager.open(params.sessionFile, agentDir);
const settingsManager = SettingsManager.create(
resolvedWorkspace,
agentDir,
);
const { session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
model,
thinkingLevel,
systemPrompt,
// TODO(steipete): Once pi-mono publishes file-magic MIME detection in `read` image payloads,
// remove `createClawdisCodingTools()` and use upstream `codingTools` again.
tools,
sessionManager,
settingsManager,
getApiKey: async (m) => {
return await getApiKeyForModel(m as Model<Api>);
},
skills: promptSkills,
contextFiles,
});
const prior = await sanitizeSessionMessagesImages(
session.messages,
"session:history",
);
if (prior.length > 0) {
session.agent.replaceMessages(prior);
const provider =
(params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER;
const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL;
await ensureClawdisModelsJson(params.config);
const agentDir = resolveClawdisAgentDir();
const { model, error } = resolveModel(provider, modelId, agentDir);
if (!model) {
throw new Error(error ?? `Unknown model: ${provider}/${modelId}`);
}
const queueHandle: EmbeddedPiQueueHandle = {
queueMessage: async (text: string) => {
await session.queueMessage(text);
},
isStreaming: () => session.isStreaming,
};
ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle);
let aborted = Boolean(params.abortSignal?.aborted);
const {
assistantTexts,
toolMetas,
unsubscribe,
flush: flushToolDebouncer,
} = subscribeEmbeddedPiSession({
session,
runId: params.runId,
verboseLevel: params.verboseLevel,
shouldEmitToolResult: params.shouldEmitToolResult,
onToolResult: params.onToolResult,
onPartialReply: params.onPartialReply,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,
const thinkingLevel = mapThinkingLevel(params.thinkLevel);
await fs.mkdir(resolvedWorkspace, { recursive: true });
await ensureSessionHeader({
sessionFile: params.sessionFile,
sessionId: params.sessionId,
cwd: resolvedWorkspace,
});
const abortTimer = setTimeout(
() => {
aborted = true;
void session.abort();
},
Math.max(1, params.timeoutMs),
);
let messagesSnapshot: AppMessage[] = [];
let sessionIdUsed = session.sessionId;
const onAbort = () => {
aborted = true;
void session.abort();
};
if (params.abortSignal) {
if (params.abortSignal.aborted) {
onAbort();
} else {
params.abortSignal.addEventListener("abort", onAbort, { once: true });
}
}
let promptError: unknown = null;
let restoreSkillEnv: (() => void) | undefined;
process.chdir(resolvedWorkspace);
try {
try {
await session.prompt(params.prompt);
} catch (err) {
promptError = err;
} finally {
messagesSnapshot = session.messages.slice();
sessionIdUsed = session.sessionId;
}
} finally {
clearTimeout(abortTimer);
unsubscribe();
flushToolDebouncer();
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
}
session.dispose();
params.abortSignal?.removeEventListener?.("abort", onAbort);
}
if (promptError && !aborted) {
throw promptError;
}
const shouldLoadSkillEntries =
!params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
const skillEntries = shouldLoadSkillEntries
? loadWorkspaceSkillEntries(resolvedWorkspace)
: [];
const skillsSnapshot =
params.skillsSnapshot ??
buildWorkspaceSkillSnapshot(resolvedWorkspace, {
config: params.config,
entries: skillEntries,
});
restoreSkillEnv = params.skillsSnapshot
? applySkillEnvOverridesFromSnapshot({
snapshot: params.skillsSnapshot,
config: params.config,
})
: applySkillEnvOverrides({
skills: skillEntries ?? [],
config: params.config,
});
const lastAssistant = messagesSnapshot
.slice()
.reverse()
.find((m) => (m as AppMessage)?.role === "assistant") as
| AssistantMessage
| undefined;
const bootstrapFiles =
await loadWorkspaceBootstrapFiles(resolvedWorkspace);
const contextFiles = buildBootstrapContextFiles(bootstrapFiles);
const promptSkills = resolvePromptSkills(skillsSnapshot, skillEntries);
const tools = createClawdisCodingTools({
bash: params.config?.agent?.bash,
});
const machineName = await getMachineDisplayName();
const runtimeInfo = {
host: machineName,
os: `${os.type()} ${os.release()}`,
arch: os.arch(),
node: process.version,
model: `${provider}/${modelId}`,
};
const reasoningTagHint = provider === "lmstudio" || provider === "ollama";
const systemPrompt = buildSystemPrompt({
appendPrompt: buildAgentSystemPromptAppend({
workspaceDir: resolvedWorkspace,
defaultThinkLevel: params.thinkLevel,
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
reasoningTagHint,
runtimeInfo,
}),
contextFiles,
skills: promptSkills,
cwd: resolvedWorkspace,
tools,
});
const usage = lastAssistant?.usage;
const agentMeta: EmbeddedPiAgentMeta = {
sessionId: sessionIdUsed,
provider: lastAssistant?.provider ?? provider,
model: lastAssistant?.model ?? model.id,
usage: usage
? {
input: usage.input,
output: usage.output,
cacheRead: usage.cacheRead,
cacheWrite: usage.cacheWrite,
total: usage.totalTokens,
}
: undefined,
};
const replyItems: Array<{ text: string; media?: string[] }> = [];
const errorText = lastAssistant
? formatAssistantErrorText(lastAssistant)
: undefined;
if (errorText) replyItems.push({ text: errorText });
const inlineToolResults =
params.verboseLevel === "on" &&
!params.onPartialReply &&
!params.onToolResult &&
toolMetas.length > 0;
if (inlineToolResults) {
for (const { toolName, meta } of toolMetas) {
const agg = formatToolAggregate(toolName, meta ? [meta] : []);
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg);
if (cleanedText)
replyItems.push({ text: cleanedText, media: mediaUrls });
}
}
for (const text of assistantTexts.length
? assistantTexts
: lastAssistant
? [extractAssistantText(lastAssistant)]
: []) {
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue;
replyItems.push({ text: cleanedText, media: mediaUrls });
}
const payloads = replyItems
.map((item) => ({
text: item.text?.trim() ? item.text.trim() : undefined,
mediaUrls: item.media?.length ? item.media : undefined,
mediaUrl: item.media?.[0],
}))
.filter(
(p) =>
p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0),
const sessionManager = SessionManager.open(params.sessionFile, agentDir);
const settingsManager = SettingsManager.create(
resolvedWorkspace,
agentDir,
);
return {
payloads: payloads.length ? payloads : undefined,
meta: {
durationMs: Date.now() - started,
agentMeta,
aborted,
},
};
} finally {
restoreSkillEnv?.();
process.chdir(prevCwd);
}
});
const { session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
model,
thinkingLevel,
systemPrompt,
// TODO(steipete): Once pi-mono publishes file-magic MIME detection in `read` image payloads,
// remove `createClawdisCodingTools()` and use upstream `codingTools` again.
tools,
sessionManager,
settingsManager,
getApiKey: async (m) => {
return await getApiKeyForModel(m as Model<Api>);
},
skills: promptSkills,
contextFiles,
});
const prior = await sanitizeSessionMessagesImages(
session.messages,
"session:history",
);
if (prior.length > 0) {
session.agent.replaceMessages(prior);
}
const queueHandle: EmbeddedPiQueueHandle = {
queueMessage: async (text: string) => {
await session.queueMessage(text);
},
isStreaming: () => session.isStreaming,
};
ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle);
let aborted = Boolean(params.abortSignal?.aborted);
const {
assistantTexts,
toolMetas,
unsubscribe,
flush: flushToolDebouncer,
} = subscribeEmbeddedPiSession({
session,
runId: params.runId,
verboseLevel: params.verboseLevel,
shouldEmitToolResult: params.shouldEmitToolResult,
onToolResult: params.onToolResult,
onPartialReply: params.onPartialReply,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,
});
const abortTimer = setTimeout(
() => {
aborted = true;
void session.abort();
},
Math.max(1, params.timeoutMs),
);
let messagesSnapshot: AppMessage[] = [];
let sessionIdUsed = session.sessionId;
const onAbort = () => {
aborted = true;
void session.abort();
};
if (params.abortSignal) {
if (params.abortSignal.aborted) {
onAbort();
} else {
params.abortSignal.addEventListener("abort", onAbort, { once: true });
}
}
let promptError: unknown = null;
try {
try {
await session.prompt(params.prompt);
} catch (err) {
promptError = err;
} finally {
messagesSnapshot = session.messages.slice();
sessionIdUsed = session.sessionId;
}
} finally {
clearTimeout(abortTimer);
unsubscribe();
flushToolDebouncer();
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
}
session.dispose();
params.abortSignal?.removeEventListener?.("abort", onAbort);
}
if (promptError && !aborted) {
throw promptError;
}
const lastAssistant = messagesSnapshot
.slice()
.reverse()
.find((m) => (m as AppMessage)?.role === "assistant") as
| AssistantMessage
| undefined;
const usage = lastAssistant?.usage;
const agentMeta: EmbeddedPiAgentMeta = {
sessionId: sessionIdUsed,
provider: lastAssistant?.provider ?? provider,
model: lastAssistant?.model ?? model.id,
usage: usage
? {
input: usage.input,
output: usage.output,
cacheRead: usage.cacheRead,
cacheWrite: usage.cacheWrite,
total: usage.totalTokens,
}
: undefined,
};
const replyItems: Array<{ text: string; media?: string[] }> = [];
const errorText = lastAssistant
? formatAssistantErrorText(lastAssistant)
: undefined;
if (errorText) replyItems.push({ text: errorText });
const inlineToolResults =
params.verboseLevel === "on" &&
!params.onPartialReply &&
!params.onToolResult &&
toolMetas.length > 0;
if (inlineToolResults) {
for (const { toolName, meta } of toolMetas) {
const agg = formatToolAggregate(toolName, meta ? [meta] : []);
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg);
if (cleanedText)
replyItems.push({ text: cleanedText, media: mediaUrls });
}
}
for (const text of assistantTexts.length
? assistantTexts
: lastAssistant
? [extractAssistantText(lastAssistant)]
: []) {
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue;
replyItems.push({ text: cleanedText, media: mediaUrls });
}
const payloads = replyItems
.map((item) => ({
text: item.text?.trim() ? item.text.trim() : undefined,
mediaUrls: item.media?.length ? item.media : undefined,
mediaUrl: item.media?.[0],
}))
.filter(
(p) =>
p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0),
);
return {
payloads: payloads.length ? payloads : undefined,
meta: {
durationMs: Date.now() - started,
agentMeta,
aborted,
},
};
} finally {
restoreSkillEnv?.();
process.chdir(prevCwd);
}
}),
);
}

View File

@@ -983,6 +983,7 @@ export async function getReplyFromConfig(
const runId = crypto.randomUUID();
const runResult = await runEmbeddedPiAgent({
sessionId: sessionIdFinal,
sessionKey,
sessionFile,
workspaceDir,
config: cfg,

View File

@@ -313,6 +313,7 @@ export async function agentCommand(
try {
result = await runEmbeddedPiAgent({
sessionId,
sessionKey,
sessionFile,
workspaceDir,
config: cfg,

View File

@@ -146,7 +146,6 @@ export async function runCronIsolatedAgentTurn(params: {
lane?: string;
}): Promise<RunCronAgentTurnResult> {
const agentCfg = params.cfg.agent;
void params.lane;
const workspaceDirRaw =
params.cfg.agent?.workspace ?? DEFAULT_AGENT_WORKSPACE_DIR;
const workspace = await ensureAgentWorkspace({
@@ -236,11 +235,13 @@ export async function runCronIsolatedAgentTurn(params: {
);
runResult = await runEmbeddedPiAgent({
sessionId: cronSession.sessionEntry.sessionId,
sessionKey: params.sessionKey,
sessionFile,
workspaceDir,
config: params.cfg,
skillsSnapshot,
prompt: commandBody,
lane: params.lane ?? "cron",
provider,
model,
thinkLevel,