From bb9a9633a8afd22910cec7c636d23309fe2332e7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 12 Jan 2026 23:41:00 +0000 Subject: [PATCH] fix: align reply threading refs --- src/auto-reply/reply/reply-reference.test.ts | 56 ++++++++++++++++++ src/auto-reply/reply/reply-reference.ts | 56 ++++++++++++++++++ src/discord/monitor.test.ts | 16 ++++++ src/discord/monitor.ts | 60 ++++++++++++++------ src/slack/monitor.ts | 39 ++++++------- 5 files changed, 189 insertions(+), 38 deletions(-) create mode 100644 src/auto-reply/reply/reply-reference.test.ts create mode 100644 src/auto-reply/reply/reply-reference.ts diff --git a/src/auto-reply/reply/reply-reference.test.ts b/src/auto-reply/reply/reply-reference.test.ts new file mode 100644 index 000000000..57f29763c --- /dev/null +++ b/src/auto-reply/reply/reply-reference.test.ts @@ -0,0 +1,56 @@ +import { describe, expect, it } from "vitest"; + +import { createReplyReferencePlanner } from "./reply-reference.js"; + +describe("createReplyReferencePlanner", () => { + it("disables references when mode is off", () => { + const planner = createReplyReferencePlanner({ + replyToMode: "off", + startId: "parent", + }); + expect(planner.use()).toBeUndefined(); + expect(planner.hasReplied()).toBe(false); + }); + + it("uses startId once when mode is first", () => { + const planner = createReplyReferencePlanner({ + replyToMode: "first", + startId: "parent", + }); + expect(planner.use()).toBe("parent"); + expect(planner.hasReplied()).toBe(true); + planner.markSent(); + expect(planner.use()).toBeUndefined(); + }); + + it("returns startId for every call when mode is all", () => { + const planner = createReplyReferencePlanner({ + replyToMode: "all", + startId: "parent", + }); + expect(planner.use()).toBe("parent"); + expect(planner.use()).toBe("parent"); + }); + + it("prefers existing thread id regardless of mode", () => { + const planner = createReplyReferencePlanner({ + replyToMode: "off", + existingId: "thread-1", + startId: "parent", + }); + expect(planner.use()).toBe("thread-1"); + expect(planner.hasReplied()).toBe(true); + }); + + it("honors allowReference=false", () => { + const planner = createReplyReferencePlanner({ + replyToMode: "all", + startId: "parent", + allowReference: false, + }); + expect(planner.use()).toBeUndefined(); + expect(planner.hasReplied()).toBe(false); + planner.markSent(); + expect(planner.hasReplied()).toBe(true); + }); +}); diff --git a/src/auto-reply/reply/reply-reference.ts b/src/auto-reply/reply/reply-reference.ts new file mode 100644 index 000000000..33dda2098 --- /dev/null +++ b/src/auto-reply/reply/reply-reference.ts @@ -0,0 +1,56 @@ +import type { ReplyToMode } from "../../config/types.js"; + +export type ReplyReferencePlanner = { + /** Returns the effective reply/thread id for the next send and updates state. */ + use(): string | undefined; + /** Mark that a reply was sent (needed when no reference is used). */ + markSent(): void; + /** Whether a reply has been sent in this flow. */ + hasReplied(): boolean; +}; + +export function createReplyReferencePlanner(options: { + replyToMode: ReplyToMode; + /** Existing thread/reference id (always used when present). */ + existingId?: string; + /** Id to start a new thread/reference when allowed (e.g., parent message id). */ + startId?: string; + /** Disable reply references entirely (e.g., when posting inside a new thread). */ + allowReference?: boolean; + /** Seed the planner with prior reply state. */ + hasReplied?: boolean; +}): ReplyReferencePlanner { + let hasReplied = options.hasReplied ?? false; + const allowReference = options.allowReference !== false; + const existingId = options.existingId?.trim(); + const startId = options.startId?.trim(); + + const use = (): string | undefined => { + if (!allowReference) return undefined; + if (existingId) { + hasReplied = true; + return existingId; + } + if (!startId) return undefined; + if (options.replyToMode === "off") return undefined; + if (options.replyToMode === "all") { + hasReplied = true; + return startId; + } + if (!hasReplied) { + hasReplied = true; + return startId; + } + return undefined; + }; + + const markSent = () => { + hasReplied = true; + }; + + return { + use, + markSent, + hasReplied: () => hasReplied, + }; +} diff --git a/src/discord/monitor.test.ts b/src/discord/monitor.test.ts index f9b745afb..72b6a4587 100644 --- a/src/discord/monitor.test.ts +++ b/src/discord/monitor.test.ts @@ -12,6 +12,7 @@ import { resolveDiscordReplyTarget, resolveDiscordShouldRequireMention, resolveGroupDmAllow, + sanitizeDiscordThreadName, shouldEmitDiscordReactionNotification, } from "./monitor.js"; @@ -326,6 +327,21 @@ describe("discord reply target selection", () => { }); }); +describe("discord autoThread name sanitization", () => { + it("strips mentions and collapses whitespace", () => { + const name = sanitizeDiscordThreadName( + " <@123> <@&456> <#789> Help here ", + "msg-1", + ); + expect(name).toBe("Help here"); + }); + + it("falls back to thread + id when empty after cleaning", () => { + const name = sanitizeDiscordThreadName(" <@123>", "abc"); + expect(name).toBe("Thread abc"); + }); +}); + describe("discord reaction notification gating", () => { it("defaults to own when mode is unset", () => { expect( diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 4ff3dc07d..6b2839f58 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -50,6 +50,7 @@ import { createReplyDispatcher, createReplyDispatcherWithTyping, } from "../auto-reply/reply/reply-dispatcher.js"; +import { createReplyReferencePlanner } from "../auto-reply/reply/reply-reference.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { @@ -350,6 +351,21 @@ export function resolveDiscordReplyTarget(opts: { return opts.hasReplied ? undefined : replyToId; } +export function sanitizeDiscordThreadName( + rawName: string, + fallbackId: string, +): string { + const cleanedName = rawName + .replace(/<@!?\d+>/g, "") // user mentions + .replace(/<@&\d+>/g, "") // role mentions + .replace(/<#\d+>/g, "") // channel mentions + .replace(/\s+/g, " ") + .trim(); + const baseSource = cleanedName || `Thread ${fallbackId}`; + const base = truncateUtf16Safe(baseSource, 80); + return truncateUtf16Safe(base, 100) || `Thread ${fallbackId}`; +} + function summarizeAllowList(list?: Array) { if (!list || list.length === 0) return "any"; const sample = list.slice(0, 4).map((entry) => String(entry)); @@ -456,6 +472,11 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { publicKey: "a", token, autoDeploy: nativeEnabled, + eventQueue: { + // Auto-threading (create thread + generate reply + post) can exceed the default + // 30s listener timeout in some environments. + listenerTimeout: 120_000, + }, }, { commands, @@ -1184,21 +1205,15 @@ export function createDiscordMessageHandler(params: { runtime.error?.(danger("discord: missing reply target")); return; } - const originalReplyTarget = replyTarget; let deliverTarget = replyTarget; if (isGuildMessage && channelConfig?.autoThread && !threadChannel) { try { - const rawName = baseText || combinedBody || "Thread"; - const cleanedName = rawName - .replace(/<@!?\d+>/g, "") // user mentions - .replace(/<@&\d+>/g, "") // role mentions - .replace(/<#\d+>/g, "") // channel mentions - .replace(/\s+/g, " ") - .trim(); - const base = truncateUtf16Safe(cleanedName || "Thread", 80); - const threadName = truncateUtf16Safe(base, 100) || `Thread ${message.id}`; + const threadName = sanitizeDiscordThreadName( + baseText || combinedBody || "Thread", + message.id, + ); const created = (await client.rest.post( `${Routes.channelMessage(message.channelId, message.id)}/threads`, @@ -1213,7 +1228,6 @@ export function createDiscordMessageHandler(params: { const createdId = created?.id ? String(created.id) : ""; if (createdId) { deliverTarget = `channel:${createdId}`; - // When autoThread is enabled, *always* reply in the created thread. replyTarget = deliverTarget; } } catch (err) { @@ -1223,6 +1237,14 @@ export function createDiscordMessageHandler(params: { } } + const replyReference = createReplyReferencePlanner({ + replyToMode: + deliverTarget !== originalReplyTarget ? "off" : replyToMode, + existingId: threadChannel ? message.id : undefined, + startId: message.id, + allowReference: deliverTarget === originalReplyTarget, + }); + if (isDirectMessage) { const sessionCfg = cfg.session; const storePath = resolveStorePath(sessionCfg?.store, { @@ -1254,21 +1276,20 @@ export function createDiscordMessageHandler(params: { .responsePrefix, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), deliver: async (payload) => { + const replyToId = replyReference.use(); await deliverDiscordReply({ replies: [payload], - target: replyTarget, + target: deliverTarget, token, accountId, rest: client.rest, runtime, - // The original message is in the parent channel; never try to reply-reference it - // when posting inside the newly-created thread. - replyToMode: - deliverTarget !== originalReplyTarget ? "off" : replyToMode, + replyToId, textLimit, maxLinesPerMessage: discordConfig?.maxLinesPerMessage, }); didSendReply = true; + replyReference.markSent(); }, onError: (err, info) => { runtime.error?.( @@ -1893,7 +1914,7 @@ async function deliverDiscordReply(params: { runtime: RuntimeEnv; textLimit: number; maxLinesPerMessage?: number; - replyToMode: ReplyToMode; + replyToId?: string; }) { const chunkLimit = Math.min(params.textLimit, 2000); for (const payload of params.replies) { @@ -1901,8 +1922,10 @@ async function deliverDiscordReply(params: { payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); const text = payload.text ?? ""; if (!text && mediaList.length === 0) continue; + const replyTo = params.replyToId?.trim() || undefined; if (mediaList.length === 0) { + let isFirstChunk = true; for (const chunk of chunkDiscordText(text, { maxChars: chunkLimit, maxLines: params.maxLinesPerMessage, @@ -1913,7 +1936,9 @@ async function deliverDiscordReply(params: { token: params.token, rest: params.rest, accountId: params.accountId, + replyTo: isFirstChunk ? replyTo : undefined, }); + isFirstChunk = false; } continue; } @@ -1925,6 +1950,7 @@ async function deliverDiscordReply(params: { rest: params.rest, mediaUrl: firstMedia, accountId: params.accountId, + replyTo, }); for (const extra of mediaList.slice(1)) { await sendMessageDiscord(params.target, "", { diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index 69e11b0bb..99addb00e 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -35,6 +35,7 @@ import { matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; +import { createReplyReferencePlanner } from "../auto-reply/reply/reply-reference.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; @@ -1135,6 +1136,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { // Shared mutable ref for tracking if a reply was sent (used by both // auto-reply path and tool path for "first" threading mode). const hasRepliedRef = { value: false }; + const replyReference = createReplyReferencePlanner({ + replyToMode, + existingId: incomingThreadTs, + startId: messageTs, + hasReplied: hasRepliedRef.value, + }); const onReplyStart = async () => { didSetStatus = true; await setSlackThreadStatus({ @@ -1150,12 +1157,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { .responsePrefix, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), deliver: async (payload) => { - const effectiveThreadTs = resolveSlackThreadTs({ - replyToMode, - incomingThreadTs, - messageTs, - hasReplied: hasRepliedRef.value, - }); + const replyThreadTs = replyReference.use(); await deliverReplies({ replies: [payload], target: replyTarget, @@ -1163,10 +1165,11 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { accountId: account.accountId, runtime, textLimit, - replyThreadTs: effectiveThreadTs, + replyThreadTs, }); didSendReply = true; - hasRepliedRef.value = true; + replyReference.markSent(); + hasRepliedRef.value = replyReference.hasReplied(); }, onError: (err, info) => { runtime.error?.( @@ -2071,19 +2074,13 @@ export function resolveSlackThreadTs(params: { messageTs: string | undefined; hasReplied: boolean; }): string | undefined { - const { replyToMode, incomingThreadTs, messageTs, hasReplied } = params; - if (incomingThreadTs) return incomingThreadTs; - if (!messageTs) return undefined; - if (replyToMode === "all") { - // All replies go to thread - return messageTs; - } - if (replyToMode === "first") { - // "first": only first reply goes to thread - return hasReplied ? undefined : messageTs; - } - // "off": never start a thread - return undefined; + const planner = createReplyReferencePlanner({ + replyToMode: params.replyToMode, + existingId: params.incomingThreadTs, + startId: params.messageTs, + hasReplied: params.hasReplied, + }); + return planner.use(); } async function deliverSlackSlashReplies(params: {