// Minimal in-process queue to serialize command executions. // Default lane ("main") preserves the existing behavior. Additional lanes allow // low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for // the main auto-reply workflow. type QueueEntry = { task: () => Promise; resolve: (value: unknown) => void; reject: (reason?: unknown) => void; enqueuedAt: number; warnAfterMs: number; onWait?: (waitMs: number, queuedAhead: number) => void; }; type LaneState = { lane: string; queue: QueueEntry[]; active: number; maxConcurrent: number; draining: boolean; }; const lanes = new Map(); function getLaneState(lane: string): LaneState { const existing = lanes.get(lane); if (existing) return existing; const created: LaneState = { lane, queue: [], active: 0, maxConcurrent: 1, draining: false, }; lanes.set(lane, created); return created; } function drainLane(lane: string) { const state = getLaneState(lane); if (state.draining) return; state.draining = true; const pump = () => { while (state.active < state.maxConcurrent && state.queue.length > 0) { const entry = state.queue.shift() as QueueEntry; const waitedMs = Date.now() - entry.enqueuedAt; if (waitedMs >= entry.warnAfterMs) { entry.onWait?.(waitedMs, state.queue.length); } state.active += 1; void (async () => { try { const result = await entry.task(); state.active -= 1; pump(); entry.resolve(result); } catch (err) { state.active -= 1; pump(); entry.reject(err); } })(); } state.draining = false; }; pump(); } export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) { const cleaned = lane.trim() || "main"; const state = getLaneState(cleaned); state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent)); drainLane(cleaned); } export function enqueueCommandInLane( lane: string, task: () => Promise, opts?: { warnAfterMs?: number; onWait?: (waitMs: number, queuedAhead: number) => void; }, ): Promise { const cleaned = lane.trim() || "main"; const warnAfterMs = opts?.warnAfterMs ?? 2_000; const state = getLaneState(cleaned); return new Promise((resolve, reject) => { state.queue.push({ task: () => task(), resolve: (value) => resolve(value as T), reject, enqueuedAt: Date.now(), warnAfterMs, onWait: opts?.onWait, }); drainLane(cleaned); }); } export function enqueueCommand( task: () => Promise, opts?: { warnAfterMs?: number; onWait?: (waitMs: number, queuedAhead: number) => void; }, ): Promise { return enqueueCommandInLane("main", task, opts); } export function getQueueSize(lane = "main") { const state = lanes.get(lane); if (!state) return 0; return state.queue.length + state.active; } export function getTotalQueueSize() { let total = 0; for (const s of lanes.values()) { total += s.queue.length + s.active; } return total; }