From b012b1105e24173c93f100c32c55d481fd78ba62 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 20 Jan 2026 19:30:22 +0000 Subject: [PATCH] fix: unblock discord listener concurrency --- CHANGELOG.md | 1 + src/agents/model-compat.ts | 4 +- src/discord/monitor.test.ts | 82 ++++++++++++++++++++++++++++- src/discord/monitor/listeners.ts | 22 ++++---- src/tui/components/custom-editor.ts | 6 +-- src/tui/tui.ts | 2 +- 6 files changed, 101 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60f23034e..b3e5981a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ Docs: https://docs.clawd.bot - Plugins: add Nextcloud Talk manifest for plugin config validation. (#1297) — thanks @ysqander. - Anthropic: default API prompt caching to 1h with configurable TTL override; ignore TTL for OAuth. - Discord: make resolve warnings avoid raw JSON payloads on rate limits. +- Discord: process message handlers in parallel across sessions to avoid event queue blocking. (#1295) - Cron: auto-deliver isolated agent output to explicit targets without tool calls. (#1285) ## 2026.1.19-3 diff --git a/src/agents/model-compat.ts b/src/agents/model-compat.ts index f4e39c8e9..741819073 100644 --- a/src/agents/model-compat.ts +++ b/src/agents/model-compat.ts @@ -1,4 +1,4 @@ -import type { Api, Model, OpenAICompletionsCompat } from "@mariozechner/pi-ai"; +import type { Api, Model } from "@mariozechner/pi-ai"; function isOpenAiCompletionsModel(model: Model): model is Model<"openai-completions"> { return model.api === "openai-completions"; @@ -10,7 +10,7 @@ export function normalizeModelCompat(model: Model): Model { if (!isZai || !isOpenAiCompletionsModel(model)) return model; const openaiModel = model as Model<"openai-completions">; - const compat = openaiModel.compat as OpenAICompletionsCompat | undefined; + const compat = openaiModel.compat ?? undefined; if (compat?.supportsDeveloperRole === false) return model; openaiModel.compat = compat diff --git a/src/discord/monitor.test.ts b/src/discord/monitor.test.ts index 55f0af637..6d3565b66 100644 --- a/src/discord/monitor.test.ts +++ b/src/discord/monitor.test.ts @@ -1,5 +1,5 @@ import type { Guild } from "@buape/carbon"; -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { allowListMatches, buildDiscordMediaPayload, @@ -17,6 +17,7 @@ import { sanitizeDiscordThreadName, shouldEmitDiscordReactionNotification, } from "./monitor.js"; +import { DiscordMessageListener } from "./monitor/listeners.js"; const fakeGuild = (id: string, name: string) => ({ id, name }) as Guild; @@ -48,6 +49,85 @@ describe("registerDiscordListener", () => { }); }); +describe("DiscordMessageListener", () => { + it("returns before the handler finishes", async () => { + let handlerResolved = false; + let resolveHandler: (() => void) | null = null; + const handlerPromise = new Promise((resolve) => { + resolveHandler = () => { + handlerResolved = true; + resolve(); + }; + }); + const handler = vi.fn(() => handlerPromise); + const listener = new DiscordMessageListener(handler); + + await listener.handle( + {} as unknown as import("./monitor/listeners.js").DiscordMessageEvent, + {} as unknown as import("@buape/carbon").Client, + ); + + expect(handler).toHaveBeenCalledOnce(); + expect(handlerResolved).toBe(false); + + resolveHandler?.(); + await handlerPromise; + }); + + it("logs handler failures", async () => { + const logger = { + warn: vi.fn(), + error: vi.fn(), + } as unknown as ReturnType; + const handler = vi.fn(async () => { + throw new Error("boom"); + }); + const listener = new DiscordMessageListener(handler, logger); + + await listener.handle( + {} as unknown as import("./monitor/listeners.js").DiscordMessageEvent, + {} as unknown as import("@buape/carbon").Client, + ); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("discord handler failed")); + }); + + it("logs slow handlers after the threshold", async () => { + vi.useFakeTimers(); + vi.setSystemTime(0); + + try { + let resolveHandler: (() => void) | null = null; + const handlerPromise = new Promise((resolve) => { + resolveHandler = resolve; + }); + const handler = vi.fn(() => handlerPromise); + const logger = { + warn: vi.fn(), + error: vi.fn(), + } as unknown as ReturnType; + const listener = new DiscordMessageListener(handler, logger); + + await listener.handle( + {} as unknown as import("./monitor/listeners.js").DiscordMessageEvent, + {} as unknown as import("@buape/carbon").Client, + ); + + vi.setSystemTime(31_000); + resolveHandler?.(); + await handlerPromise; + await Promise.resolve(); + + expect(logger.warn).toHaveBeenCalled(); + const [, meta] = logger.warn.mock.calls[0] ?? []; + expect(meta?.durationMs).toBeGreaterThanOrEqual(30_000); + } finally { + vi.useRealTimers(); + } + }); +}); + describe("discord allowlist helpers", () => { it("normalizes slugs", () => { expect(normalizeDiscordSlug("Friends of Clawd")).toBe("friends-of-clawd"); diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index 1d8f66250..0eb5e2e8e 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -73,16 +73,20 @@ export class DiscordMessageListener extends MessageCreateListener { async handle(data: DiscordMessageEvent, client: Client) { const startedAt = Date.now(); - try { - await this.handler(data, client); - } finally { - logSlowDiscordListener({ - logger: this.logger, - listener: this.constructor.name, - event: this.type, - durationMs: Date.now() - startedAt, + const task = Promise.resolve(this.handler(data, client)); + void task + .catch((err) => { + const logger = this.logger ?? discordEventQueueLog; + logger.error(danger(`discord handler failed: ${String(err)}`)); + }) + .finally(() => { + logSlowDiscordListener({ + logger: this.logger, + listener: this.constructor.name, + event: this.type, + durationMs: Date.now() - startedAt, + }); }); - } } } diff --git a/src/tui/components/custom-editor.ts b/src/tui/components/custom-editor.ts index 6f209b479..b66452e61 100644 --- a/src/tui/components/custom-editor.ts +++ b/src/tui/components/custom-editor.ts @@ -1,4 +1,4 @@ -import { Editor, type EditorTheme, type TUI, Key, matchesKey } from "@mariozechner/pi-tui"; +import { Editor, type EditorTheme, Key, matchesKey } from "@mariozechner/pi-tui"; export class CustomEditor extends Editor { onEscape?: () => void; @@ -12,8 +12,8 @@ export class CustomEditor extends Editor { onShiftTab?: () => void; onAltEnter?: () => void; - constructor(tui: TUI, theme: EditorTheme) { - super(tui, theme); + constructor(theme: EditorTheme) { + super(theme); } handleInput(data: string): void { if (matchesKey(data, Key.alt("enter")) && this.onAltEnter) { diff --git a/src/tui/tui.ts b/src/tui/tui.ts index a5e6e34d7..753f5511f 100644 --- a/src/tui/tui.ts +++ b/src/tui/tui.ts @@ -193,7 +193,7 @@ export async function runTui(opts: TuiOptions) { const statusContainer = new Container(); const footer = new Text("", 1, 0); const chatLog = new ChatLog(); - const editor = new CustomEditor(tui, editorTheme); + const editor = new CustomEditor(editorTheme); const root = new Container(); root.addChild(header); root.addChild(chatLog);