diff --git a/src/agents/lanes.ts b/src/agents/lanes.ts index 1dba87a60..1688a4b8b 100644 --- a/src/agents/lanes.ts +++ b/src/agents/lanes.ts @@ -1,2 +1,4 @@ -export const AGENT_LANE_NESTED = "nested" as const; -export const AGENT_LANE_SUBAGENT = "subagent" as const; +import { CommandLane } from "../process/lanes.js"; + +export const AGENT_LANE_NESTED = CommandLane.Nested; +export const AGENT_LANE_SUBAGENT = CommandLane.Subagent; diff --git a/src/agents/pi-embedded-runner/lanes.ts b/src/agents/pi-embedded-runner/lanes.ts index 8655a0f0f..81b742ded 100644 --- a/src/agents/pi-embedded-runner/lanes.ts +++ b/src/agents/pi-embedded-runner/lanes.ts @@ -1,11 +1,13 @@ +import { CommandLane } from "../../process/lanes.js"; + export function resolveSessionLane(key: string) { - const cleaned = key.trim() || "main"; + const cleaned = key.trim() || CommandLane.Main; return cleaned.startsWith("session:") ? cleaned : `session:${cleaned}`; } export function resolveGlobalLane(lane?: string) { const cleaned = lane?.trim(); - return cleaned ? cleaned : "main"; + return cleaned ? cleaned : CommandLane.Main; } export function resolveEmbeddedSessionLane(key: string) { diff --git a/src/gateway/server-lanes.ts b/src/gateway/server-lanes.ts index 9e05edd22..6c42b5559 100644 --- a/src/gateway/server-lanes.ts +++ b/src/gateway/server-lanes.ts @@ -1,9 +1,10 @@ import type { loadConfig } from "../config/config.js"; import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js"; import { setCommandLaneConcurrency } from "../process/command-queue.js"; +import { CommandLane } from "../process/lanes.js"; export function applyGatewayLaneConcurrency(cfg: ReturnType) { - setCommandLaneConcurrency("cron", cfg.cron?.maxConcurrentRuns ?? 1); - setCommandLaneConcurrency("main", resolveAgentMaxConcurrent(cfg)); - setCommandLaneConcurrency("subagent", resolveSubagentMaxConcurrent(cfg)); + setCommandLaneConcurrency(CommandLane.Cron, cfg.cron?.maxConcurrentRuns ?? 1); + setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(cfg)); + setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(cfg)); } diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index 6728228db..9d18af6cd 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -9,6 +9,7 @@ import { } from "../infra/restart.js"; import { setCommandLaneConcurrency } from "../process/command-queue.js"; import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js"; +import { CommandLane } from "../process/lanes.js"; import { isTruthyEnvValue } from "../infra/env.js"; import type { ChannelKind, GatewayReloadPlan } from "./config-reload.js"; import { resolveHooksConfig } from "./hooks.js"; @@ -127,9 +128,9 @@ export function createGatewayReloadHandlers(params: { } } - setCommandLaneConcurrency("cron", nextConfig.cron?.maxConcurrentRuns ?? 1); - setCommandLaneConcurrency("main", resolveAgentMaxConcurrent(nextConfig)); - setCommandLaneConcurrency("subagent", resolveSubagentMaxConcurrent(nextConfig)); + setCommandLaneConcurrency(CommandLane.Cron, nextConfig.cron?.maxConcurrentRuns ?? 1); + setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(nextConfig)); + setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(nextConfig)); if (plan.hotReasons.length > 0) { params.logReload.info(`config hot reload applied (${plan.hotReasons.join(", ")})`); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 3c01396cb..25ba6036b 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -24,6 +24,7 @@ import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { getQueueSize } from "../process/command-queue.js"; +import { CommandLane } from "../process/lanes.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeAgentId } from "../routing/session-key.js"; import { emitHeartbeatEvent } from "./heartbeat-events.js"; @@ -327,7 +328,7 @@ export async function runHeartbeatOnce(opts: { return { status: "skipped", reason: "disabled" }; } - const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)("main"); + const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)(CommandLane.Main); if (queueSize > 0) { return { status: "skipped", reason: "requests-in-flight" }; } diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 0370c20dd..51b21881c 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -1,3 +1,5 @@ +import { CommandLane } from "./lanes.js"; + // Minimal in-process queue to serialize command executions. // Default lane ("main") preserves the existing behavior. Additional lanes allow // low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for @@ -69,7 +71,7 @@ function drainLane(lane: string) { } export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) { - const cleaned = lane.trim() || "main"; + const cleaned = lane.trim() || CommandLane.Main; const state = getLaneState(cleaned); state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent)); drainLane(cleaned); @@ -83,7 +85,7 @@ export function enqueueCommandInLane( onWait?: (waitMs: number, queuedAhead: number) => void; }, ): Promise { - const cleaned = lane.trim() || "main"; + const cleaned = lane.trim() || CommandLane.Main; const warnAfterMs = opts?.warnAfterMs ?? 2_000; const state = getLaneState(cleaned); return new Promise((resolve, reject) => { @@ -106,10 +108,10 @@ export function enqueueCommand( onWait?: (waitMs: number, queuedAhead: number) => void; }, ): Promise { - return enqueueCommandInLane("main", task, opts); + return enqueueCommandInLane(CommandLane.Main, task, opts); } -export function getQueueSize(lane = "main") { +export function getQueueSize(lane = CommandLane.Main) { const state = lanes.get(lane); if (!state) return 0; return state.queue.length + state.active; @@ -123,8 +125,8 @@ export function getTotalQueueSize() { return total; } -export function clearCommandLane(lane = "main") { - const cleaned = lane.trim() || "main"; +export function clearCommandLane(lane = CommandLane.Main) { + const cleaned = lane.trim() || CommandLane.Main; const state = lanes.get(cleaned); if (!state) return 0; const removed = state.queue.length; diff --git a/src/process/lanes.ts b/src/process/lanes.ts new file mode 100644 index 000000000..63ef1f534 --- /dev/null +++ b/src/process/lanes.ts @@ -0,0 +1,6 @@ +export const enum CommandLane { + Main = "main", + Cron = "cron", + Subagent = "subagent", + Nested = "nested", +}