From 11006d1245143754a71e548c691f123a5ee07952 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 7 Jan 2026 23:22:12 +0000 Subject: [PATCH] refactor: share backoff helpers --- src/infra/backoff.ts | 26 +++++++++++++ src/signal/monitor.ts | 69 ++++++--------------------------- src/signal/sse-reconnect.ts | 76 +++++++++++++++++++++++++++++++++++++ src/web/reconnect.ts | 38 ++----------------- 4 files changed, 118 insertions(+), 91 deletions(-) create mode 100644 src/infra/backoff.ts create mode 100644 src/signal/sse-reconnect.ts diff --git a/src/infra/backoff.ts b/src/infra/backoff.ts new file mode 100644 index 000000000..f1d07b579 --- /dev/null +++ b/src/infra/backoff.ts @@ -0,0 +1,26 @@ +import { setTimeout as delay } from "node:timers/promises"; + +export type BackoffPolicy = { + initialMs: number; + maxMs: number; + factor: number; + jitter: number; +}; + +export function computeBackoff(policy: BackoffPolicy, attempt: number) { + const base = policy.initialMs * policy.factor ** Math.max(attempt - 1, 0); + const jitter = base * policy.jitter * Math.random(); + return Math.min(policy.maxMs, Math.round(base + jitter)); +} + +export async function sleepWithAbort(ms: number, abortSignal?: AbortSignal) { + if (ms <= 0) return; + try { + await delay(ms, undefined, { signal: abortSignal }); + } catch (err) { + if (abortSignal?.aborted) { + throw new Error("aborted"); + } + throw err; + } +} diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index 4a89f82ad..be3e0a05a 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -1,4 +1,3 @@ -import { setTimeout as delay } from "node:timers/promises"; import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; @@ -16,9 +15,10 @@ import { import { resolveAgentRoute } from "../routing/resolve-route.js"; import type { RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; -import { signalCheck, signalRpcRequest, streamSignalEvents } from "./client.js"; +import { signalCheck, signalRpcRequest } from "./client.js"; import { spawnSignalDaemon } from "./daemon.js"; import { sendMessageSignal } from "./send.js"; +import { runSignalSseLoop } from "./sse-reconnect.js"; type SignalEnvelope = { sourceNumber?: string | null; @@ -525,62 +525,17 @@ export async function monitorSignalProvider( if (!queuedFinal) return; }; - // Reconnection loop for SSE stream - const MAX_RETRY_DELAY = 10_000; // 10 seconds - const INITIAL_RETRY_DELAY = 1_000; // 1 second - const RETRY_JITTER = 0.2; - let retryDelay = INITIAL_RETRY_DELAY; - - while (!opts.abortSignal?.aborted) { - try { - await streamSignalEvents({ - baseUrl, - account, - abortSignal: opts.abortSignal, - onEvent: (event) => { - void handleEvent(event).catch((err) => { - runtime.error?.(`event handler failed: ${String(err)}`); - }); - }, + await runSignalSseLoop({ + baseUrl, + account, + abortSignal: opts.abortSignal, + runtime, + onEvent: (event) => { + void handleEvent(event).catch((err) => { + runtime.error?.(`event handler failed: ${String(err)}`); }); - if (opts.abortSignal?.aborted) return; - runtime.log?.( - `Signal SSE stream ended, reconnecting in ${retryDelay / 1000}s...`, - ); - const jitteredDelay = Math.max( - 0, - Math.round( - retryDelay * (1 - RETRY_JITTER + Math.random() * 2 * RETRY_JITTER), - ), - ); - try { - await delay(jitteredDelay, undefined, { signal: opts.abortSignal }); - } catch (err) { - if (opts.abortSignal?.aborted) return; - throw err; - } - retryDelay = INITIAL_RETRY_DELAY; - } catch (err) { - if (opts.abortSignal?.aborted) return; - runtime.error?.(`Signal SSE stream error: ${String(err)}`); - runtime.log?.( - `Signal SSE connection lost, reconnecting in ${retryDelay / 1000}s...`, - ); - const jitteredDelay = Math.max( - 0, - Math.round( - retryDelay * (1 - RETRY_JITTER + Math.random() * 2 * RETRY_JITTER), - ), - ); - try { - await delay(jitteredDelay, undefined, { signal: opts.abortSignal }); - } catch (err) { - if (opts.abortSignal?.aborted) return; - throw err; - } - retryDelay = Math.min(retryDelay * 2, MAX_RETRY_DELAY); - } - } + }, + }); } catch (err) { if (opts.abortSignal?.aborted) return; throw err; diff --git a/src/signal/sse-reconnect.ts b/src/signal/sse-reconnect.ts new file mode 100644 index 000000000..011793e7d --- /dev/null +++ b/src/signal/sse-reconnect.ts @@ -0,0 +1,76 @@ +import { logVerbose, shouldLogVerbose } from "../globals.js"; +import type { BackoffPolicy } from "../infra/backoff.js"; +import { computeBackoff, sleepWithAbort } from "../infra/backoff.js"; +import type { RuntimeEnv } from "../runtime.js"; +import { type SignalSseEvent, streamSignalEvents } from "./client.js"; + +const DEFAULT_RECONNECT_POLICY: BackoffPolicy = { + initialMs: 1_000, + maxMs: 10_000, + factor: 2, + jitter: 0.2, +}; + +type RunSignalSseLoopParams = { + baseUrl: string; + account?: string; + abortSignal?: AbortSignal; + runtime: RuntimeEnv; + onEvent: (event: SignalSseEvent) => void; + policy?: Partial; +}; + +export async function runSignalSseLoop({ + baseUrl, + account, + abortSignal, + runtime, + onEvent, + policy, +}: RunSignalSseLoopParams) { + const reconnectPolicy = { + ...DEFAULT_RECONNECT_POLICY, + ...policy, + }; + let reconnectAttempts = 0; + + const logReconnectVerbose = (message: string) => { + if (!shouldLogVerbose()) return; + logVerbose(message); + }; + + while (!abortSignal?.aborted) { + try { + await streamSignalEvents({ + baseUrl, + account, + abortSignal, + onEvent: (event) => { + reconnectAttempts = 0; + onEvent(event); + }, + }); + if (abortSignal?.aborted) return; + reconnectAttempts += 1; + const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts); + logReconnectVerbose( + `Signal SSE stream ended, reconnecting in ${delayMs / 1000}s...`, + ); + await sleepWithAbort(delayMs, abortSignal); + } catch (err) { + if (abortSignal?.aborted) return; + runtime.error?.(`Signal SSE stream error: ${String(err)}`); + reconnectAttempts += 1; + const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts); + runtime.log?.( + `Signal SSE connection lost, reconnecting in ${delayMs / 1000}s...`, + ); + try { + await sleepWithAbort(delayMs, abortSignal); + } catch (sleepErr) { + if (abortSignal?.aborted) return; + throw sleepErr; + } + } + } +} diff --git a/src/web/reconnect.ts b/src/web/reconnect.ts index 8f8322528..c4f2324cb 100644 --- a/src/web/reconnect.ts +++ b/src/web/reconnect.ts @@ -1,12 +1,10 @@ import { randomUUID } from "node:crypto"; import type { ClawdbotConfig } from "../config/config.js"; +import type { BackoffPolicy } from "../infra/backoff.js"; +import { computeBackoff, sleepWithAbort } from "../infra/backoff.js"; -export type ReconnectPolicy = { - initialMs: number; - maxMs: number; - factor: number; - jitter: number; +export type ReconnectPolicy = BackoffPolicy & { maxAttempts: number; }; @@ -51,35 +49,7 @@ export function resolveReconnectPolicy( return merged; } -export function computeBackoff(policy: ReconnectPolicy, attempt: number) { - const base = policy.initialMs * policy.factor ** Math.max(attempt - 1, 0); - const jitter = base * policy.jitter * Math.random(); - return Math.min(policy.maxMs, Math.round(base + jitter)); -} - -export function sleepWithAbort(ms: number, abortSignal?: AbortSignal) { - if (ms <= 0) return Promise.resolve(); - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - cleanup(); - resolve(); - }, ms); - - const onAbort = () => { - cleanup(); - reject(new Error("aborted")); - }; - - const cleanup = () => { - clearTimeout(timer); - abortSignal?.removeEventListener("abort", onAbort); - }; - - if (abortSignal) { - abortSignal.addEventListener("abort", onAbort, { once: true }); - } - }); -} +export { computeBackoff, sleepWithAbort }; export function newConnectionId() { return randomUUID();