From b7c900739e28dc85f87ae9d671c4121ce831766c Mon Sep 17 00:00:00 2001 From: Josh Palmer Date: Thu, 8 Jan 2026 19:59:06 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20codex:=20handle=20discord=20gate?= =?UTF-8?q?way=20error=20events=20(#504)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 1 + src/discord/monitor.gateway.test.ts | 49 +++++++++++++++++ src/discord/monitor.ts | 84 ++++++++++++++++++++++++----- 3 files changed, 122 insertions(+), 12 deletions(-) create mode 100644 src/discord/monitor.gateway.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index d7af201c1..324bd54e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Unreleased +- Discord: stop provider when gateway reconnects are exhausted and surface errors. (#504) - Auto-reply: preserve block reply ordering with timeout fallback for streaming. (#503) — thanks @joshp123 - WhatsApp: group `/model list` output by provider for scannability. (#456) - thanks @mcinteerj - Hooks: allow per-hook model overrides for webhook/Gmail runs (e.g. GPT 5 Mini). diff --git a/src/discord/monitor.gateway.test.ts b/src/discord/monitor.gateway.test.ts new file mode 100644 index 000000000..fc7d62784 --- /dev/null +++ b/src/discord/monitor.gateway.test.ts @@ -0,0 +1,49 @@ +import { EventEmitter } from "node:events"; + +import { describe, expect, it, vi } from "vitest"; + +import { waitForDiscordGatewayStop } from "./monitor.js"; + +describe("waitForDiscordGatewayStop", () => { + it("resolves on abort and disconnects gateway", async () => { + const emitter = new EventEmitter(); + const disconnect = vi.fn(); + const abort = new AbortController(); + + const promise = waitForDiscordGatewayStop({ + gateway: { emitter, disconnect }, + abortSignal: abort.signal, + }); + + expect(emitter.listenerCount("error")).toBe(1); + abort.abort(); + + await expect(promise).resolves.toBeUndefined(); + expect(disconnect).toHaveBeenCalledTimes(1); + expect(emitter.listenerCount("error")).toBe(0); + }); + + it("rejects on gateway error and disconnects", async () => { + const emitter = new EventEmitter(); + const disconnect = vi.fn(); + const onGatewayError = vi.fn(); + const abort = new AbortController(); + const err = new Error("boom"); + + const promise = waitForDiscordGatewayStop({ + gateway: { emitter, disconnect }, + abortSignal: abort.signal, + onGatewayError, + }); + + emitter.emit("error", err); + + await expect(promise).rejects.toThrow("boom"); + expect(onGatewayError).toHaveBeenCalledWith(err); + expect(disconnect).toHaveBeenCalledTimes(1); + expect(emitter.listenerCount("error")).toBe(0); + + abort.abort(); + expect(disconnect).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 27baa3c66..5d845fe2e 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -1,3 +1,5 @@ +import type { EventEmitter } from "node:events"; + import { ChannelType, Client, @@ -82,6 +84,62 @@ type DiscordMediaInfo = { placeholder: string; }; +type DiscordGatewayHandle = { + emitter?: Pick; + disconnect?: () => void; +}; + +export async function waitForDiscordGatewayStop(params: { + gateway?: DiscordGatewayHandle; + abortSignal?: AbortSignal; + onGatewayError?: (err: unknown) => void; +}): Promise { + const { gateway, abortSignal, onGatewayError } = params; + const emitter = gateway?.emitter; + return await new Promise((resolve, reject) => { + let settled = false; + const cleanup = () => { + abortSignal?.removeEventListener("abort", onAbort); + emitter?.removeListener("error", onGatewayErrorEvent); + }; + const finishResolve = () => { + if (settled) return; + settled = true; + cleanup(); + try { + gateway?.disconnect?.(); + } finally { + resolve(); + } + }; + const finishReject = (err: unknown) => { + if (settled) return; + settled = true; + cleanup(); + try { + gateway?.disconnect?.(); + } finally { + reject(err); + } + }; + const onAbort = () => { + finishResolve(); + }; + const onGatewayErrorEvent = (err: unknown) => { + onGatewayError?.(err); + finishReject(err); + }; + + if (abortSignal?.aborted) { + onAbort(); + return; + } + + abortSignal?.addEventListener("abort", onAbort, { once: true }); + emitter?.on("error", onGatewayErrorEvent); + }); +} + type DiscordHistoryEntry = { sender: string; body: string; @@ -402,18 +460,20 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`); - await new Promise((resolve) => { - const onAbort = async () => { - try { - const gateway = client.getPlugin("gateway"); - gateway?.disconnect(); - } finally { - resolve(); - } - }; - opts.abortSignal?.addEventListener("abort", () => { - void onAbort(); - }); + const gateway = client.getPlugin("gateway"); + const gatewayEmitter = (gateway as unknown as { emitter?: EventEmitter }) + ?.emitter; + await waitForDiscordGatewayStop({ + gateway: gateway + ? { + emitter: gatewayEmitter, + disconnect: () => gateway.disconnect(), + } + : undefined, + abortSignal: opts.abortSignal, + onGatewayError: (err) => { + runtime.error?.(danger(`discord gateway error: ${String(err)}`)); + }, }); }