refactor: share backoff helpers

This commit is contained in:
Peter Steinberger
2026-01-07 23:22:12 +00:00
parent c96f669f2f
commit 11006d1245
4 changed files with 118 additions and 91 deletions

26
src/infra/backoff.ts Normal file
View File

@@ -0,0 +1,26 @@
import { setTimeout as delay } from "node:timers/promises";
export type BackoffPolicy = {
initialMs: number;
maxMs: number;
factor: number;
jitter: number;
};
export function computeBackoff(policy: BackoffPolicy, attempt: number) {
const base = policy.initialMs * policy.factor ** Math.max(attempt - 1, 0);
const jitter = base * policy.jitter * Math.random();
return Math.min(policy.maxMs, Math.round(base + jitter));
}
export async function sleepWithAbort(ms: number, abortSignal?: AbortSignal) {
if (ms <= 0) return;
try {
await delay(ms, undefined, { signal: abortSignal });
} catch (err) {
if (abortSignal?.aborted) {
throw new Error("aborted");
}
throw err;
}
}

View File

@@ -1,4 +1,3 @@
import { setTimeout as delay } from "node:timers/promises";
import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
@@ -16,9 +15,10 @@ import {
import { resolveAgentRoute } from "../routing/resolve-route.js";
import type { RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { signalCheck, signalRpcRequest, streamSignalEvents } from "./client.js";
import { signalCheck, signalRpcRequest } from "./client.js";
import { spawnSignalDaemon } from "./daemon.js";
import { sendMessageSignal } from "./send.js";
import { runSignalSseLoop } from "./sse-reconnect.js";
type SignalEnvelope = {
sourceNumber?: string | null;
@@ -525,62 +525,17 @@ export async function monitorSignalProvider(
if (!queuedFinal) return;
};
// Reconnection loop for SSE stream
const MAX_RETRY_DELAY = 10_000; // 10 seconds
const INITIAL_RETRY_DELAY = 1_000; // 1 second
const RETRY_JITTER = 0.2;
let retryDelay = INITIAL_RETRY_DELAY;
while (!opts.abortSignal?.aborted) {
try {
await streamSignalEvents({
baseUrl,
account,
abortSignal: opts.abortSignal,
onEvent: (event) => {
void handleEvent(event).catch((err) => {
runtime.error?.(`event handler failed: ${String(err)}`);
});
},
await runSignalSseLoop({
baseUrl,
account,
abortSignal: opts.abortSignal,
runtime,
onEvent: (event) => {
void handleEvent(event).catch((err) => {
runtime.error?.(`event handler failed: ${String(err)}`);
});
if (opts.abortSignal?.aborted) return;
runtime.log?.(
`Signal SSE stream ended, reconnecting in ${retryDelay / 1000}s...`,
);
const jitteredDelay = Math.max(
0,
Math.round(
retryDelay * (1 - RETRY_JITTER + Math.random() * 2 * RETRY_JITTER),
),
);
try {
await delay(jitteredDelay, undefined, { signal: opts.abortSignal });
} catch (err) {
if (opts.abortSignal?.aborted) return;
throw err;
}
retryDelay = INITIAL_RETRY_DELAY;
} catch (err) {
if (opts.abortSignal?.aborted) return;
runtime.error?.(`Signal SSE stream error: ${String(err)}`);
runtime.log?.(
`Signal SSE connection lost, reconnecting in ${retryDelay / 1000}s...`,
);
const jitteredDelay = Math.max(
0,
Math.round(
retryDelay * (1 - RETRY_JITTER + Math.random() * 2 * RETRY_JITTER),
),
);
try {
await delay(jitteredDelay, undefined, { signal: opts.abortSignal });
} catch (err) {
if (opts.abortSignal?.aborted) return;
throw err;
}
retryDelay = Math.min(retryDelay * 2, MAX_RETRY_DELAY);
}
}
},
});
} catch (err) {
if (opts.abortSignal?.aborted) return;
throw err;

View File

@@ -0,0 +1,76 @@
import { logVerbose, shouldLogVerbose } from "../globals.js";
import type { BackoffPolicy } from "../infra/backoff.js";
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
import type { RuntimeEnv } from "../runtime.js";
import { type SignalSseEvent, streamSignalEvents } from "./client.js";
const DEFAULT_RECONNECT_POLICY: BackoffPolicy = {
initialMs: 1_000,
maxMs: 10_000,
factor: 2,
jitter: 0.2,
};
type RunSignalSseLoopParams = {
baseUrl: string;
account?: string;
abortSignal?: AbortSignal;
runtime: RuntimeEnv;
onEvent: (event: SignalSseEvent) => void;
policy?: Partial<BackoffPolicy>;
};
export async function runSignalSseLoop({
baseUrl,
account,
abortSignal,
runtime,
onEvent,
policy,
}: RunSignalSseLoopParams) {
const reconnectPolicy = {
...DEFAULT_RECONNECT_POLICY,
...policy,
};
let reconnectAttempts = 0;
const logReconnectVerbose = (message: string) => {
if (!shouldLogVerbose()) return;
logVerbose(message);
};
while (!abortSignal?.aborted) {
try {
await streamSignalEvents({
baseUrl,
account,
abortSignal,
onEvent: (event) => {
reconnectAttempts = 0;
onEvent(event);
},
});
if (abortSignal?.aborted) return;
reconnectAttempts += 1;
const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts);
logReconnectVerbose(
`Signal SSE stream ended, reconnecting in ${delayMs / 1000}s...`,
);
await sleepWithAbort(delayMs, abortSignal);
} catch (err) {
if (abortSignal?.aborted) return;
runtime.error?.(`Signal SSE stream error: ${String(err)}`);
reconnectAttempts += 1;
const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts);
runtime.log?.(
`Signal SSE connection lost, reconnecting in ${delayMs / 1000}s...`,
);
try {
await sleepWithAbort(delayMs, abortSignal);
} catch (sleepErr) {
if (abortSignal?.aborted) return;
throw sleepErr;
}
}
}
}

View File

@@ -1,12 +1,10 @@
import { randomUUID } from "node:crypto";
import type { ClawdbotConfig } from "../config/config.js";
import type { BackoffPolicy } from "../infra/backoff.js";
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
export type ReconnectPolicy = {
initialMs: number;
maxMs: number;
factor: number;
jitter: number;
export type ReconnectPolicy = BackoffPolicy & {
maxAttempts: number;
};
@@ -51,35 +49,7 @@ export function resolveReconnectPolicy(
return merged;
}
export function computeBackoff(policy: ReconnectPolicy, attempt: number) {
const base = policy.initialMs * policy.factor ** Math.max(attempt - 1, 0);
const jitter = base * policy.jitter * Math.random();
return Math.min(policy.maxMs, Math.round(base + jitter));
}
export function sleepWithAbort(ms: number, abortSignal?: AbortSignal) {
if (ms <= 0) return Promise.resolve();
return new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
cleanup();
resolve();
}, ms);
const onAbort = () => {
cleanup();
reject(new Error("aborted"));
};
const cleanup = () => {
clearTimeout(timer);
abortSignal?.removeEventListener("abort", onAbort);
};
if (abortSignal) {
abortSignal.addEventListener("abort", onAbort, { once: true });
}
});
}
export { computeBackoff, sleepWithAbort };
export function newConnectionId() {
return randomUUID();