Files
clawdbot/src/process/command-queue.ts
2025-12-26 13:35:44 +01:00

134 lines
3.3 KiB
TypeScript

// Minimal in-process queue to serialize command executions.
// Default lane ("main") preserves the existing behavior. Additional lanes allow
// low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for
// the main auto-reply workflow.
type QueueEntry = {
task: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
enqueuedAt: number;
warnAfterMs: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
};
type LaneState = {
lane: string;
queue: QueueEntry[];
active: number;
maxConcurrent: number;
draining: boolean;
};
const lanes = new Map<string, LaneState>();
function getLaneState(lane: string): LaneState {
const existing = lanes.get(lane);
if (existing) return existing;
const created: LaneState = {
lane,
queue: [],
active: 0,
maxConcurrent: 1,
draining: false,
};
lanes.set(lane, created);
return created;
}
function drainLane(lane: string) {
const state = getLaneState(lane);
if (state.draining) return;
state.draining = true;
const pump = () => {
while (state.active < state.maxConcurrent && state.queue.length > 0) {
const entry = state.queue.shift() as QueueEntry;
const waitedMs = Date.now() - entry.enqueuedAt;
if (waitedMs >= entry.warnAfterMs) {
entry.onWait?.(waitedMs, state.queue.length);
}
state.active += 1;
void (async () => {
try {
const result = await entry.task();
state.active -= 1;
pump();
entry.resolve(result);
} catch (err) {
state.active -= 1;
pump();
entry.reject(err);
}
})();
}
state.draining = false;
};
pump();
}
export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) {
const cleaned = lane.trim() || "main";
const state = getLaneState(cleaned);
state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent));
drainLane(cleaned);
}
export function enqueueCommandInLane<T>(
lane: string,
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
const cleaned = lane.trim() || "main";
const warnAfterMs = opts?.warnAfterMs ?? 2_000;
const state = getLaneState(cleaned);
return new Promise<T>((resolve, reject) => {
state.queue.push({
task: () => task(),
resolve: (value) => resolve(value as T),
reject,
enqueuedAt: Date.now(),
warnAfterMs,
onWait: opts?.onWait,
});
drainLane(cleaned);
});
}
export function enqueueCommand<T>(
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
return enqueueCommandInLane("main", task, opts);
}
export function getQueueSize(lane = "main") {
const state = lanes.get(lane);
if (!state) return 0;
return state.queue.length + state.active;
}
export function getTotalQueueSize() {
let total = 0;
for (const s of lanes.values()) {
total += s.queue.length + s.active;
}
return total;
}
export function clearCommandLane(lane = "main") {
const cleaned = lane.trim() || "main";
const state = lanes.get(cleaned);
if (!state) return 0;
const removed = state.queue.length;
state.queue.length = 0;
return removed;
}