From c96f669f2f6f33ec7db7219f692981f543b38025 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 7 Jan 2026 23:15:42 +0000 Subject: [PATCH] fix: reconnect signal sse monitor --- CHANGELOG.md | 1 + src/signal/monitor.tool-result.test.ts | 40 ++++++++++++++++++++++++++ src/signal/monitor.ts | 37 +++++++++++++++++++++--- 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a9093af3..07126eb66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Auto-reply: removed `autoReply` from Discord/Slack/Telegram channel configs; use `requireMention` instead (Telegram topics now support `requireMention` overrides). ### Fixes +- Signal: reconnect SSE monitor with abortable backoff; log stream errors. Thanks @nexty5870 for PR #430. - Discord/Telegram: add per-request retry policy with configurable delays and docs. - Telegram: run long polling via grammY runner with per-chat sequentialization and concurrency tied to `agent.maxConcurrent`. Thanks @mukhtharcm for PR #366. - macOS: prevent gateway launchd startup race where the app could kill a just-started gateway; avoid unnecessary `bootout` and ensure the job is enabled at login. Fixes #306. Thanks @gupsammy for PR #387. diff --git a/src/signal/monitor.tool-result.test.ts b/src/signal/monitor.tool-result.test.ts index ccf01fc4c..8e0be93ea 100644 --- a/src/signal/monitor.tool-result.test.ts +++ b/src/signal/monitor.tool-result.test.ts @@ -73,6 +73,7 @@ beforeEach(() => { describe("monitorSignalProvider tool results", () => { it("sends tool summaries with responsePrefix", async () => { + const abortController = new AbortController(); replyMock.mockImplementation(async (_ctx, opts) => { await opts?.onToolResult?.({ text: "tool update" }); return { text: "final reply" }; @@ -93,11 +94,13 @@ describe("monitorSignalProvider tool results", () => { event: "receive", data: JSON.stringify(payload), }); + abortController.abort(); }); await monitorSignalProvider({ autoStart: false, baseUrl: "http://127.0.0.1:8080", + abortSignal: abortController.signal, }); await flush(); @@ -112,6 +115,7 @@ describe("monitorSignalProvider tool results", () => { ...config, signal: { autoStart: false, dmPolicy: "pairing", allowFrom: [] }, }; + const abortController = new AbortController(); streamMock.mockImplementation(async ({ onEvent }) => { const payload = { @@ -128,11 +132,13 @@ describe("monitorSignalProvider tool results", () => { event: "receive", data: JSON.stringify(payload), }); + abortController.abort(); }); await monitorSignalProvider({ autoStart: false, baseUrl: "http://127.0.0.1:8080", + abortSignal: abortController.signal, }); await flush(); @@ -150,6 +156,7 @@ describe("monitorSignalProvider tool results", () => { ...config, signal: { autoStart: false, dmPolicy: "pairing", allowFrom: [] }, }; + const abortController = new AbortController(); upsertPairingRequestMock .mockResolvedValueOnce({ code: "PAIRCODE", created: true }) .mockResolvedValueOnce({ code: "PAIRCODE", created: false }); @@ -176,15 +183,48 @@ describe("monitorSignalProvider tool results", () => { envelope: { ...payload.envelope, timestamp: 2 }, }), }); + abortController.abort(); }); await monitorSignalProvider({ autoStart: false, baseUrl: "http://127.0.0.1:8080", + abortSignal: abortController.signal, }); await flush(); expect(sendMock).toHaveBeenCalledTimes(1); }); + + it("reconnects after stream errors until aborted", async () => { + vi.useFakeTimers(); + const abortController = new AbortController(); + const randomSpy = vi.spyOn(Math, "random").mockReturnValue(0); + let calls = 0; + + streamMock.mockImplementation(async () => { + calls += 1; + if (calls === 1) { + throw new Error("stream dropped"); + } + abortController.abort(); + }); + + try { + const monitorPromise = monitorSignalProvider({ + autoStart: false, + baseUrl: "http://127.0.0.1:8080", + abortSignal: abortController.signal, + }); + + await vi.advanceTimersByTimeAsync(1_000); + await monitorPromise; + + expect(streamMock).toHaveBeenCalledTimes(2); + } finally { + randomSpy.mockRestore(); + vi.useRealTimers(); + } + }); }); diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index b80cc88a9..4a89f82ad 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -1,3 +1,4 @@ +import { setTimeout as delay } from "node:timers/promises"; import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; @@ -525,8 +526,9 @@ export async function monitorSignalProvider( }; // Reconnection loop for SSE stream - const MAX_RETRY_DELAY = 30_000; // 30 seconds + const MAX_RETRY_DELAY = 10_000; // 10 seconds const INITIAL_RETRY_DELAY = 1_000; // 1 second + const RETRY_JITTER = 0.2; let retryDelay = INITIAL_RETRY_DELAY; while (!opts.abortSignal?.aborted) { @@ -541,14 +543,41 @@ export async function monitorSignalProvider( }); }, }); - // If streamSignalEvents returns normally, break (shouldn't happen normally) - break; + if (opts.abortSignal?.aborted) return; + runtime.log?.( + `Signal SSE stream ended, reconnecting in ${retryDelay / 1000}s...`, + ); + const jitteredDelay = Math.max( + 0, + Math.round( + retryDelay * (1 - RETRY_JITTER + Math.random() * 2 * RETRY_JITTER), + ), + ); + try { + await delay(jitteredDelay, undefined, { signal: opts.abortSignal }); + } catch (err) { + if (opts.abortSignal?.aborted) return; + throw err; + } + retryDelay = INITIAL_RETRY_DELAY; } catch (err) { if (opts.abortSignal?.aborted) return; + runtime.error?.(`Signal SSE stream error: ${String(err)}`); runtime.log?.( `Signal SSE connection lost, reconnecting in ${retryDelay / 1000}s...`, ); - await new Promise((resolve) => setTimeout(resolve, retryDelay)); + const jitteredDelay = Math.max( + 0, + Math.round( + retryDelay * (1 - RETRY_JITTER + Math.random() * 2 * RETRY_JITTER), + ), + ); + try { + await delay(jitteredDelay, undefined, { signal: opts.abortSignal }); + } catch (err) { + if (opts.abortSignal?.aborted) return; + throw err; + } retryDelay = Math.min(retryDelay * 2, MAX_RETRY_DELAY); } }