fix: reconnect signal sse monitor
This commit is contained in:
@@ -18,6 +18,7 @@
|
|||||||
- Auto-reply: removed `autoReply` from Discord/Slack/Telegram channel configs; use `requireMention` instead (Telegram topics now support `requireMention` overrides).
|
- Auto-reply: removed `autoReply` from Discord/Slack/Telegram channel configs; use `requireMention` instead (Telegram topics now support `requireMention` overrides).
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
|
- Signal: reconnect SSE monitor with abortable backoff; log stream errors. Thanks @nexty5870 for PR #430.
|
||||||
- Discord/Telegram: add per-request retry policy with configurable delays and docs.
|
- Discord/Telegram: add per-request retry policy with configurable delays and docs.
|
||||||
- Telegram: run long polling via grammY runner with per-chat sequentialization and concurrency tied to `agent.maxConcurrent`. Thanks @mukhtharcm for PR #366.
|
- Telegram: run long polling via grammY runner with per-chat sequentialization and concurrency tied to `agent.maxConcurrent`. Thanks @mukhtharcm for PR #366.
|
||||||
- macOS: prevent gateway launchd startup race where the app could kill a just-started gateway; avoid unnecessary `bootout` and ensure the job is enabled at login. Fixes #306. Thanks @gupsammy for PR #387.
|
- macOS: prevent gateway launchd startup race where the app could kill a just-started gateway; avoid unnecessary `bootout` and ensure the job is enabled at login. Fixes #306. Thanks @gupsammy for PR #387.
|
||||||
|
|||||||
@@ -73,6 +73,7 @@ beforeEach(() => {
|
|||||||
|
|
||||||
describe("monitorSignalProvider tool results", () => {
|
describe("monitorSignalProvider tool results", () => {
|
||||||
it("sends tool summaries with responsePrefix", async () => {
|
it("sends tool summaries with responsePrefix", async () => {
|
||||||
|
const abortController = new AbortController();
|
||||||
replyMock.mockImplementation(async (_ctx, opts) => {
|
replyMock.mockImplementation(async (_ctx, opts) => {
|
||||||
await opts?.onToolResult?.({ text: "tool update" });
|
await opts?.onToolResult?.({ text: "tool update" });
|
||||||
return { text: "final reply" };
|
return { text: "final reply" };
|
||||||
@@ -93,11 +94,13 @@ describe("monitorSignalProvider tool results", () => {
|
|||||||
event: "receive",
|
event: "receive",
|
||||||
data: JSON.stringify(payload),
|
data: JSON.stringify(payload),
|
||||||
});
|
});
|
||||||
|
abortController.abort();
|
||||||
});
|
});
|
||||||
|
|
||||||
await monitorSignalProvider({
|
await monitorSignalProvider({
|
||||||
autoStart: false,
|
autoStart: false,
|
||||||
baseUrl: "http://127.0.0.1:8080",
|
baseUrl: "http://127.0.0.1:8080",
|
||||||
|
abortSignal: abortController.signal,
|
||||||
});
|
});
|
||||||
|
|
||||||
await flush();
|
await flush();
|
||||||
@@ -112,6 +115,7 @@ describe("monitorSignalProvider tool results", () => {
|
|||||||
...config,
|
...config,
|
||||||
signal: { autoStart: false, dmPolicy: "pairing", allowFrom: [] },
|
signal: { autoStart: false, dmPolicy: "pairing", allowFrom: [] },
|
||||||
};
|
};
|
||||||
|
const abortController = new AbortController();
|
||||||
|
|
||||||
streamMock.mockImplementation(async ({ onEvent }) => {
|
streamMock.mockImplementation(async ({ onEvent }) => {
|
||||||
const payload = {
|
const payload = {
|
||||||
@@ -128,11 +132,13 @@ describe("monitorSignalProvider tool results", () => {
|
|||||||
event: "receive",
|
event: "receive",
|
||||||
data: JSON.stringify(payload),
|
data: JSON.stringify(payload),
|
||||||
});
|
});
|
||||||
|
abortController.abort();
|
||||||
});
|
});
|
||||||
|
|
||||||
await monitorSignalProvider({
|
await monitorSignalProvider({
|
||||||
autoStart: false,
|
autoStart: false,
|
||||||
baseUrl: "http://127.0.0.1:8080",
|
baseUrl: "http://127.0.0.1:8080",
|
||||||
|
abortSignal: abortController.signal,
|
||||||
});
|
});
|
||||||
|
|
||||||
await flush();
|
await flush();
|
||||||
@@ -150,6 +156,7 @@ describe("monitorSignalProvider tool results", () => {
|
|||||||
...config,
|
...config,
|
||||||
signal: { autoStart: false, dmPolicy: "pairing", allowFrom: [] },
|
signal: { autoStart: false, dmPolicy: "pairing", allowFrom: [] },
|
||||||
};
|
};
|
||||||
|
const abortController = new AbortController();
|
||||||
upsertPairingRequestMock
|
upsertPairingRequestMock
|
||||||
.mockResolvedValueOnce({ code: "PAIRCODE", created: true })
|
.mockResolvedValueOnce({ code: "PAIRCODE", created: true })
|
||||||
.mockResolvedValueOnce({ code: "PAIRCODE", created: false });
|
.mockResolvedValueOnce({ code: "PAIRCODE", created: false });
|
||||||
@@ -176,15 +183,48 @@ describe("monitorSignalProvider tool results", () => {
|
|||||||
envelope: { ...payload.envelope, timestamp: 2 },
|
envelope: { ...payload.envelope, timestamp: 2 },
|
||||||
}),
|
}),
|
||||||
});
|
});
|
||||||
|
abortController.abort();
|
||||||
});
|
});
|
||||||
|
|
||||||
await monitorSignalProvider({
|
await monitorSignalProvider({
|
||||||
autoStart: false,
|
autoStart: false,
|
||||||
baseUrl: "http://127.0.0.1:8080",
|
baseUrl: "http://127.0.0.1:8080",
|
||||||
|
abortSignal: abortController.signal,
|
||||||
});
|
});
|
||||||
|
|
||||||
await flush();
|
await flush();
|
||||||
|
|
||||||
expect(sendMock).toHaveBeenCalledTimes(1);
|
expect(sendMock).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("reconnects after stream errors until aborted", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const abortController = new AbortController();
|
||||||
|
const randomSpy = vi.spyOn(Math, "random").mockReturnValue(0);
|
||||||
|
let calls = 0;
|
||||||
|
|
||||||
|
streamMock.mockImplementation(async () => {
|
||||||
|
calls += 1;
|
||||||
|
if (calls === 1) {
|
||||||
|
throw new Error("stream dropped");
|
||||||
|
}
|
||||||
|
abortController.abort();
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
const monitorPromise = monitorSignalProvider({
|
||||||
|
autoStart: false,
|
||||||
|
baseUrl: "http://127.0.0.1:8080",
|
||||||
|
abortSignal: abortController.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(1_000);
|
||||||
|
await monitorPromise;
|
||||||
|
|
||||||
|
expect(streamMock).toHaveBeenCalledTimes(2);
|
||||||
|
} finally {
|
||||||
|
randomSpy.mockRestore();
|
||||||
|
vi.useRealTimers();
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import { setTimeout as delay } from "node:timers/promises";
|
||||||
import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
|
import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
|
||||||
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
|
import { formatAgentEnvelope } from "../auto-reply/envelope.js";
|
||||||
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
|
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
|
||||||
@@ -525,8 +526,9 @@ export async function monitorSignalProvider(
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Reconnection loop for SSE stream
|
// Reconnection loop for SSE stream
|
||||||
const MAX_RETRY_DELAY = 30_000; // 30 seconds
|
const MAX_RETRY_DELAY = 10_000; // 10 seconds
|
||||||
const INITIAL_RETRY_DELAY = 1_000; // 1 second
|
const INITIAL_RETRY_DELAY = 1_000; // 1 second
|
||||||
|
const RETRY_JITTER = 0.2;
|
||||||
let retryDelay = INITIAL_RETRY_DELAY;
|
let retryDelay = INITIAL_RETRY_DELAY;
|
||||||
|
|
||||||
while (!opts.abortSignal?.aborted) {
|
while (!opts.abortSignal?.aborted) {
|
||||||
@@ -541,14 +543,41 @@ export async function monitorSignalProvider(
|
|||||||
});
|
});
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
// If streamSignalEvents returns normally, break (shouldn't happen normally)
|
if (opts.abortSignal?.aborted) return;
|
||||||
break;
|
runtime.log?.(
|
||||||
|
`Signal SSE stream ended, reconnecting in ${retryDelay / 1000}s...`,
|
||||||
|
);
|
||||||
|
const jitteredDelay = Math.max(
|
||||||
|
0,
|
||||||
|
Math.round(
|
||||||
|
retryDelay * (1 - RETRY_JITTER + Math.random() * 2 * RETRY_JITTER),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
await delay(jitteredDelay, undefined, { signal: opts.abortSignal });
|
||||||
|
} catch (err) {
|
||||||
|
if (opts.abortSignal?.aborted) return;
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
retryDelay = INITIAL_RETRY_DELAY;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (opts.abortSignal?.aborted) return;
|
if (opts.abortSignal?.aborted) return;
|
||||||
|
runtime.error?.(`Signal SSE stream error: ${String(err)}`);
|
||||||
runtime.log?.(
|
runtime.log?.(
|
||||||
`Signal SSE connection lost, reconnecting in ${retryDelay / 1000}s...`,
|
`Signal SSE connection lost, reconnecting in ${retryDelay / 1000}s...`,
|
||||||
);
|
);
|
||||||
await new Promise((resolve) => setTimeout(resolve, retryDelay));
|
const jitteredDelay = Math.max(
|
||||||
|
0,
|
||||||
|
Math.round(
|
||||||
|
retryDelay * (1 - RETRY_JITTER + Math.random() * 2 * RETRY_JITTER),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
await delay(jitteredDelay, undefined, { signal: opts.abortSignal });
|
||||||
|
} catch (err) {
|
||||||
|
if (opts.abortSignal?.aborted) return;
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
retryDelay = Math.min(retryDelay * 2, MAX_RETRY_DELAY);
|
retryDelay = Math.min(retryDelay * 2, MAX_RETRY_DELAY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user