refactor: use command lane enum

This commit is contained in:
Peter Steinberger
2026-01-20 10:50:18 +00:00
parent e5f7435d9f
commit 48f733e4b3
7 changed files with 32 additions and 17 deletions

View File

@@ -1,2 +1,4 @@
export const AGENT_LANE_NESTED = "nested" as const;
export const AGENT_LANE_SUBAGENT = "subagent" as const;
import { CommandLane } from "../process/lanes.js";
export const AGENT_LANE_NESTED = CommandLane.Nested;
export const AGENT_LANE_SUBAGENT = CommandLane.Subagent;

View File

@@ -1,11 +1,13 @@
import { CommandLane } from "../../process/lanes.js";
export function resolveSessionLane(key: string) {
const cleaned = key.trim() || "main";
const cleaned = key.trim() || CommandLane.Main;
return cleaned.startsWith("session:") ? cleaned : `session:${cleaned}`;
}
export function resolveGlobalLane(lane?: string) {
const cleaned = lane?.trim();
return cleaned ? cleaned : "main";
return cleaned ? cleaned : CommandLane.Main;
}
export function resolveEmbeddedSessionLane(key: string) {

View File

@@ -1,9 +1,10 @@
import type { loadConfig } from "../config/config.js";
import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js";
import { setCommandLaneConcurrency } from "../process/command-queue.js";
import { CommandLane } from "../process/lanes.js";
export function applyGatewayLaneConcurrency(cfg: ReturnType<typeof loadConfig>) {
setCommandLaneConcurrency("cron", cfg.cron?.maxConcurrentRuns ?? 1);
setCommandLaneConcurrency("main", resolveAgentMaxConcurrent(cfg));
setCommandLaneConcurrency("subagent", resolveSubagentMaxConcurrent(cfg));
setCommandLaneConcurrency(CommandLane.Cron, cfg.cron?.maxConcurrentRuns ?? 1);
setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(cfg));
setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(cfg));
}

View File

@@ -9,6 +9,7 @@ import {
} from "../infra/restart.js";
import { setCommandLaneConcurrency } from "../process/command-queue.js";
import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js";
import { CommandLane } from "../process/lanes.js";
import { isTruthyEnvValue } from "../infra/env.js";
import type { ChannelKind, GatewayReloadPlan } from "./config-reload.js";
import { resolveHooksConfig } from "./hooks.js";
@@ -127,9 +128,9 @@ export function createGatewayReloadHandlers(params: {
}
}
setCommandLaneConcurrency("cron", nextConfig.cron?.maxConcurrentRuns ?? 1);
setCommandLaneConcurrency("main", resolveAgentMaxConcurrent(nextConfig));
setCommandLaneConcurrency("subagent", resolveSubagentMaxConcurrent(nextConfig));
setCommandLaneConcurrency(CommandLane.Cron, nextConfig.cron?.maxConcurrentRuns ?? 1);
setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(nextConfig));
setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(nextConfig));
if (plan.hotReasons.length > 0) {
params.logReload.info(`config hot reload applied (${plan.hotReasons.join(", ")})`);

View File

@@ -24,6 +24,7 @@ import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
import { formatErrorMessage } from "../infra/errors.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { getQueueSize } from "../process/command-queue.js";
import { CommandLane } from "../process/lanes.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeAgentId } from "../routing/session-key.js";
import { emitHeartbeatEvent } from "./heartbeat-events.js";
@@ -327,7 +328,7 @@ export async function runHeartbeatOnce(opts: {
return { status: "skipped", reason: "disabled" };
}
const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)("main");
const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)(CommandLane.Main);
if (queueSize > 0) {
return { status: "skipped", reason: "requests-in-flight" };
}

View File

@@ -1,3 +1,5 @@
import { CommandLane } from "./lanes.js";
// Minimal in-process queue to serialize command executions.
// Default lane ("main") preserves the existing behavior. Additional lanes allow
// low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for
@@ -69,7 +71,7 @@ function drainLane(lane: string) {
}
export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) {
const cleaned = lane.trim() || "main";
const cleaned = lane.trim() || CommandLane.Main;
const state = getLaneState(cleaned);
state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent));
drainLane(cleaned);
@@ -83,7 +85,7 @@ export function enqueueCommandInLane<T>(
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
const cleaned = lane.trim() || "main";
const cleaned = lane.trim() || CommandLane.Main;
const warnAfterMs = opts?.warnAfterMs ?? 2_000;
const state = getLaneState(cleaned);
return new Promise<T>((resolve, reject) => {
@@ -106,10 +108,10 @@ export function enqueueCommand<T>(
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
return enqueueCommandInLane("main", task, opts);
return enqueueCommandInLane(CommandLane.Main, task, opts);
}
export function getQueueSize(lane = "main") {
export function getQueueSize(lane = CommandLane.Main) {
const state = lanes.get(lane);
if (!state) return 0;
return state.queue.length + state.active;
@@ -123,8 +125,8 @@ export function getTotalQueueSize() {
return total;
}
export function clearCommandLane(lane = "main") {
const cleaned = lane.trim() || "main";
export function clearCommandLane(lane = CommandLane.Main) {
const cleaned = lane.trim() || CommandLane.Main;
const state = lanes.get(cleaned);
if (!state) return 0;
const removed = state.queue.length;

6
src/process/lanes.ts Normal file
View File

@@ -0,0 +1,6 @@
export const enum CommandLane {
Main = "main",
Cron = "cron",
Subagent = "subagent",
Nested = "nested",
}