fix: unblock discord listener concurrency

This commit is contained in:
Peter Steinberger
2026-01-20 19:30:22 +00:00
parent 21370fc09b
commit b012b1105e
6 changed files with 101 additions and 16 deletions

View File

@@ -32,6 +32,7 @@ Docs: https://docs.clawd.bot
- Plugins: add Nextcloud Talk manifest for plugin config validation. (#1297) — thanks @ysqander. - Plugins: add Nextcloud Talk manifest for plugin config validation. (#1297) — thanks @ysqander.
- Anthropic: default API prompt caching to 1h with configurable TTL override; ignore TTL for OAuth. - Anthropic: default API prompt caching to 1h with configurable TTL override; ignore TTL for OAuth.
- Discord: make resolve warnings avoid raw JSON payloads on rate limits. - Discord: make resolve warnings avoid raw JSON payloads on rate limits.
- Discord: process message handlers in parallel across sessions to avoid event queue blocking. (#1295)
- Cron: auto-deliver isolated agent output to explicit targets without tool calls. (#1285) - Cron: auto-deliver isolated agent output to explicit targets without tool calls. (#1285)
## 2026.1.19-3 ## 2026.1.19-3

View File

@@ -1,4 +1,4 @@
import type { Api, Model, OpenAICompletionsCompat } from "@mariozechner/pi-ai"; import type { Api, Model } from "@mariozechner/pi-ai";
function isOpenAiCompletionsModel(model: Model<Api>): model is Model<"openai-completions"> { function isOpenAiCompletionsModel(model: Model<Api>): model is Model<"openai-completions"> {
return model.api === "openai-completions"; return model.api === "openai-completions";
@@ -10,7 +10,7 @@ export function normalizeModelCompat(model: Model<Api>): Model<Api> {
if (!isZai || !isOpenAiCompletionsModel(model)) return model; if (!isZai || !isOpenAiCompletionsModel(model)) return model;
const openaiModel = model as Model<"openai-completions">; const openaiModel = model as Model<"openai-completions">;
const compat = openaiModel.compat as OpenAICompletionsCompat | undefined; const compat = openaiModel.compat ?? undefined;
if (compat?.supportsDeveloperRole === false) return model; if (compat?.supportsDeveloperRole === false) return model;
openaiModel.compat = compat openaiModel.compat = compat

View File

@@ -1,5 +1,5 @@
import type { Guild } from "@buape/carbon"; import type { Guild } from "@buape/carbon";
import { describe, expect, it } from "vitest"; import { describe, expect, it, vi } from "vitest";
import { import {
allowListMatches, allowListMatches,
buildDiscordMediaPayload, buildDiscordMediaPayload,
@@ -17,6 +17,7 @@ import {
sanitizeDiscordThreadName, sanitizeDiscordThreadName,
shouldEmitDiscordReactionNotification, shouldEmitDiscordReactionNotification,
} from "./monitor.js"; } from "./monitor.js";
import { DiscordMessageListener } from "./monitor/listeners.js";
const fakeGuild = (id: string, name: string) => ({ id, name }) as Guild; const fakeGuild = (id: string, name: string) => ({ id, name }) as Guild;
@@ -48,6 +49,85 @@ describe("registerDiscordListener", () => {
}); });
}); });
describe("DiscordMessageListener", () => {
it("returns before the handler finishes", async () => {
let handlerResolved = false;
let resolveHandler: (() => void) | null = null;
const handlerPromise = new Promise<void>((resolve) => {
resolveHandler = () => {
handlerResolved = true;
resolve();
};
});
const handler = vi.fn(() => handlerPromise);
const listener = new DiscordMessageListener(handler);
await listener.handle(
{} as unknown as import("./monitor/listeners.js").DiscordMessageEvent,
{} as unknown as import("@buape/carbon").Client,
);
expect(handler).toHaveBeenCalledOnce();
expect(handlerResolved).toBe(false);
resolveHandler?.();
await handlerPromise;
});
it("logs handler failures", async () => {
const logger = {
warn: vi.fn(),
error: vi.fn(),
} as unknown as ReturnType<typeof import("../logging/subsystem.js").createSubsystemLogger>;
const handler = vi.fn(async () => {
throw new Error("boom");
});
const listener = new DiscordMessageListener(handler, logger);
await listener.handle(
{} as unknown as import("./monitor/listeners.js").DiscordMessageEvent,
{} as unknown as import("@buape/carbon").Client,
);
await new Promise((resolve) => setTimeout(resolve, 0));
expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("discord handler failed"));
});
it("logs slow handlers after the threshold", async () => {
vi.useFakeTimers();
vi.setSystemTime(0);
try {
let resolveHandler: (() => void) | null = null;
const handlerPromise = new Promise<void>((resolve) => {
resolveHandler = resolve;
});
const handler = vi.fn(() => handlerPromise);
const logger = {
warn: vi.fn(),
error: vi.fn(),
} as unknown as ReturnType<typeof import("../logging/subsystem.js").createSubsystemLogger>;
const listener = new DiscordMessageListener(handler, logger);
await listener.handle(
{} as unknown as import("./monitor/listeners.js").DiscordMessageEvent,
{} as unknown as import("@buape/carbon").Client,
);
vi.setSystemTime(31_000);
resolveHandler?.();
await handlerPromise;
await Promise.resolve();
expect(logger.warn).toHaveBeenCalled();
const [, meta] = logger.warn.mock.calls[0] ?? [];
expect(meta?.durationMs).toBeGreaterThanOrEqual(30_000);
} finally {
vi.useRealTimers();
}
});
});
describe("discord allowlist helpers", () => { describe("discord allowlist helpers", () => {
it("normalizes slugs", () => { it("normalizes slugs", () => {
expect(normalizeDiscordSlug("Friends of Clawd")).toBe("friends-of-clawd"); expect(normalizeDiscordSlug("Friends of Clawd")).toBe("friends-of-clawd");

View File

@@ -73,16 +73,20 @@ export class DiscordMessageListener extends MessageCreateListener {
async handle(data: DiscordMessageEvent, client: Client) { async handle(data: DiscordMessageEvent, client: Client) {
const startedAt = Date.now(); const startedAt = Date.now();
try { const task = Promise.resolve(this.handler(data, client));
await this.handler(data, client); void task
} finally { .catch((err) => {
logSlowDiscordListener({ const logger = this.logger ?? discordEventQueueLog;
logger: this.logger, logger.error(danger(`discord handler failed: ${String(err)}`));
listener: this.constructor.name, })
event: this.type, .finally(() => {
durationMs: Date.now() - startedAt, logSlowDiscordListener({
logger: this.logger,
listener: this.constructor.name,
event: this.type,
durationMs: Date.now() - startedAt,
});
}); });
}
} }
} }

View File

@@ -1,4 +1,4 @@
import { Editor, type EditorTheme, type TUI, Key, matchesKey } from "@mariozechner/pi-tui"; import { Editor, type EditorTheme, Key, matchesKey } from "@mariozechner/pi-tui";
export class CustomEditor extends Editor { export class CustomEditor extends Editor {
onEscape?: () => void; onEscape?: () => void;
@@ -12,8 +12,8 @@ export class CustomEditor extends Editor {
onShiftTab?: () => void; onShiftTab?: () => void;
onAltEnter?: () => void; onAltEnter?: () => void;
constructor(tui: TUI, theme: EditorTheme) { constructor(theme: EditorTheme) {
super(tui, theme); super(theme);
} }
handleInput(data: string): void { handleInput(data: string): void {
if (matchesKey(data, Key.alt("enter")) && this.onAltEnter) { if (matchesKey(data, Key.alt("enter")) && this.onAltEnter) {

View File

@@ -193,7 +193,7 @@ export async function runTui(opts: TuiOptions) {
const statusContainer = new Container(); const statusContainer = new Container();
const footer = new Text("", 1, 0); const footer = new Text("", 1, 0);
const chatLog = new ChatLog(); const chatLog = new ChatLog();
const editor = new CustomEditor(tui, editorTheme); const editor = new CustomEditor(editorTheme);
const root = new Container(); const root = new Container();
root.addChild(header); root.addChild(header);
root.addChild(chatLog); root.addChild(chatLog);