import type { ClawdbotConfig } from "../config/config.js"; import type { InboundDebounceByProvider } from "../config/types.messages.js"; const resolveMs = (value: unknown): number | undefined => { if (typeof value !== "number" || !Number.isFinite(value)) return undefined; return Math.max(0, Math.trunc(value)); }; const resolveChannelOverride = (params: { byChannel?: InboundDebounceByProvider; channel: string; }): number | undefined => { if (!params.byChannel) return undefined; const channelKey = params.channel as keyof InboundDebounceByProvider; return resolveMs(params.byChannel[channelKey]); }; export function resolveInboundDebounceMs(params: { cfg: ClawdbotConfig; channel: string; overrideMs?: number; }): number { const inbound = params.cfg.messages?.inbound; const override = resolveMs(params.overrideMs); const byChannel = resolveChannelOverride({ byChannel: inbound?.byChannel, channel: params.channel, }); const base = resolveMs(inbound?.debounceMs); return override ?? byChannel ?? base ?? 0; } type DebounceBuffer = { items: T[]; timeout: ReturnType | null; }; export function createInboundDebouncer(params: { debounceMs: number; buildKey: (item: T) => string | null | undefined; shouldDebounce?: (item: T) => boolean; onFlush: (items: T[]) => Promise; onError?: (err: unknown, items: T[]) => void; }) { const buffers = new Map>(); const debounceMs = Math.max(0, Math.trunc(params.debounceMs)); const flushBuffer = async (key: string, buffer: DebounceBuffer) => { buffers.delete(key); if (buffer.timeout) { clearTimeout(buffer.timeout); buffer.timeout = null; } if (buffer.items.length === 0) return; try { await params.onFlush(buffer.items); } catch (err) { params.onError?.(err, buffer.items); } }; const flushKey = async (key: string) => { const buffer = buffers.get(key); if (!buffer) return; await flushBuffer(key, buffer); }; const scheduleFlush = (key: string, buffer: DebounceBuffer) => { if (buffer.timeout) clearTimeout(buffer.timeout); buffer.timeout = setTimeout(() => { void flushBuffer(key, buffer); }, debounceMs); buffer.timeout.unref?.(); }; const enqueue = async (item: T) => { const key = params.buildKey(item); const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true); if (!canDebounce || !key) { if (key && buffers.has(key)) { await flushKey(key); } await params.onFlush([item]); return; } const existing = buffers.get(key); if (existing) { existing.items.push(item); scheduleFlush(key, existing); return; } const buffer: DebounceBuffer = { items: [item], timeout: null }; buffers.set(key, buffer); scheduleFlush(key, buffer); }; return { enqueue, flushKey }; }