From 84c8209158414828351f3355e730f3a5c9f330be Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 6 Jan 2026 21:41:30 +0100 Subject: [PATCH] fix(slack): clear assistant thread status after replies --- src/auto-reply/reply/reply-dispatcher.ts | 45 +++++++- src/discord/monitor.ts | 61 +++++----- src/slack/monitor.tool-result.test.ts | 10 +- src/slack/monitor.ts | 72 ++++++------ src/telegram/bot.ts | 56 ++++----- src/web/auto-reply.ts | 140 +++++++++++------------ 6 files changed, 205 insertions(+), 179 deletions(-) diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index 9f4987530..0db0e102c 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -1,6 +1,7 @@ import { stripHeartbeatToken } from "../heartbeat.js"; import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../tokens.js"; -import type { ReplyPayload } from "../types.js"; +import type { GetReplyOptions, ReplyPayload } from "../types.js"; +import type { TypingController } from "./typing.js"; export type ReplyDispatchKind = "tool" | "block" | "final"; @@ -22,6 +23,20 @@ export type ReplyDispatcherOptions = { onError?: ReplyDispatchErrorHandler; }; +type ReplyDispatcherWithTypingOptions = Omit< + ReplyDispatcherOptions, + "onIdle" +> & { + onReplyStart?: () => Promise | void; + onIdle?: () => void; +}; + +type ReplyDispatcherWithTypingResult = { + dispatcher: ReplyDispatcher; + replyOptions: Pick; + markDispatchIdle: () => void; +}; + export type ReplyDispatcher = { sendToolResult: (payload: ReplyPayload) => boolean; sendBlockReply: (payload: ReplyPayload) => boolean; @@ -107,3 +122,31 @@ export function createReplyDispatcher( getQueuedCounts: () => ({ ...queuedCounts }), }; } + +export function createReplyDispatcherWithTyping( + options: ReplyDispatcherWithTypingOptions, +): ReplyDispatcherWithTypingResult { + const { onReplyStart, onIdle, ...dispatcherOptions } = options; + let typingController: TypingController | undefined; + const dispatcher = createReplyDispatcher({ + ...dispatcherOptions, + onIdle: () => { + typingController?.markDispatchIdle(); + onIdle?.(); + }, + }); + + return { + dispatcher, + replyOptions: { + onReplyStart, + onTypingController: (typing) => { + typingController = typing; + }, + }, + markDispatchIdle: () => { + typingController?.markDispatchIdle(); + onIdle?.(); + }, + }; +} diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 11c8fca36..7f6e9f355 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -33,8 +33,10 @@ import { buildMentionRegexes, matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; -import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; -import type { TypingController } from "../auto-reply/reply/typing.js"; +import { + createReplyDispatcher, + createReplyDispatcherWithTyping, +} from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import type { ReplyToMode } from "../config/config.js"; @@ -797,43 +799,36 @@ export function createDiscordMessageHandler(params: { } let didSendReply = false; - let typingController: TypingController | undefined; - const dispatcher = createReplyDispatcher({ - responsePrefix: cfg.messages?.responsePrefix, - deliver: async (payload) => { - await deliverDiscordReply({ - replies: [payload], - target: replyTarget, - token, - rest: client.rest, - runtime, - replyToMode, - textLimit, - }); - didSendReply = true; - }, - onIdle: () => { - typingController?.markDispatchIdle(); - }, - onError: (err, info) => { - runtime.error?.( - danger(`discord ${info.kind} reply failed: ${String(err)}`), - ); - }, - }); + const { dispatcher, replyOptions, markDispatchIdle } = + createReplyDispatcherWithTyping({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + await deliverDiscordReply({ + replies: [payload], + target: replyTarget, + token, + rest: client.rest, + runtime, + replyToMode, + textLimit, + }); + didSendReply = true; + }, + onError: (err, info) => { + runtime.error?.( + danger(`discord ${info.kind} reply failed: ${String(err)}`), + ); + }, + onReplyStart: () => sendTyping(message), + }); const { queuedFinal, counts } = await dispatchReplyFromConfig({ ctx: ctxPayload, cfg, dispatcher, - replyOptions: { - onReplyStart: () => sendTyping(message), - onTypingController: (typing) => { - typingController = typing; - }, - }, + replyOptions, }); - typingController?.markDispatchIdle(); + markDispatchIdle(); if (!queuedFinal) { if ( isGuildMessage && diff --git a/src/slack/monitor.tool-result.test.ts b/src/slack/monitor.tool-result.test.ts index 9f83bea1b..8042c3744 100644 --- a/src/slack/monitor.tool-result.test.ts +++ b/src/slack/monitor.tool-result.test.ts @@ -189,12 +189,20 @@ describe("monitorSlackProvider tool results", () => { const client = getSlackClient() as { assistant?: { threads?: { setStatus?: ReturnType } }; }; - expect(client.assistant?.threads?.setStatus).toHaveBeenCalledWith({ + const setStatus = client.assistant?.threads?.setStatus; + expect(setStatus).toHaveBeenCalledTimes(2); + expect(setStatus).toHaveBeenNthCalledWith(1, { token: "bot-token", channel_id: "C1", thread_ts: "123", status: "is typing...", }); + expect(setStatus).toHaveBeenNthCalledWith(2, { + token: "bot-token", + channel_id: "C1", + thread_ts: "123", + status: "", + }); }); it("accepts channel messages when mentionPatterns match", async () => { diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index 122404b2e..f98841d41 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -19,8 +19,7 @@ import { buildMentionRegexes, matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; -import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; -import type { TypingController } from "../auto-reply/reply/typing.js"; +import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; @@ -860,61 +859,58 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { // Only thread replies if the incoming message was in a thread. const incomingThreadTs = message.thread_ts; const statusThreadTs = message.thread_ts ?? message.ts; + let didSetStatus = false; const onReplyStart = async () => { + didSetStatus = true; await setSlackThreadStatus({ channelId: message.channel, threadTs: statusThreadTs, status: "is typing...", }); }; - let typingController: TypingController | undefined; - const dispatcher = createReplyDispatcher({ - responsePrefix: cfg.messages?.responsePrefix, - deliver: async (payload) => { - await deliverReplies({ - replies: [payload], - target: replyTarget, - token: botToken, - runtime, - textLimit, - threadTs: incomingThreadTs, - }); - }, - onIdle: () => { - typingController?.markDispatchIdle(); - }, - onError: (err, info) => { - runtime.error?.( - danger(`slack ${info.kind} reply failed: ${String(err)}`), - ); - void setSlackThreadStatus({ - channelId: message.channel, - threadTs: statusThreadTs, - status: "", - }); - }, - }); + const { dispatcher, replyOptions, markDispatchIdle } = + createReplyDispatcherWithTyping({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + target: replyTarget, + token: botToken, + runtime, + textLimit, + threadTs: incomingThreadTs, + }); + }, + onError: (err, info) => { + runtime.error?.( + danger(`slack ${info.kind} reply failed: ${String(err)}`), + ); + if (didSetStatus) { + void setSlackThreadStatus({ + channelId: message.channel, + threadTs: statusThreadTs, + status: "", + }); + } + }, + onReplyStart, + }); const { queuedFinal, counts } = await dispatchReplyFromConfig({ ctx: ctxPayload, cfg, dispatcher, - replyOptions: { - onReplyStart, - onTypingController: (typing) => { - typingController = typing; - }, - }, + replyOptions, }); - typingController?.markDispatchIdle(); - if (!queuedFinal) { + markDispatchIdle(); + if (didSetStatus) { await setSlackThreadStatus({ channelId: message.channel, threadTs: statusThreadTs, status: "", }); - return; } + if (!queuedFinal) return; if (shouldLogVerbose()) { const finalCount = counts.final; logVerbose( diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 487a1c038..296f5b15c 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -19,8 +19,7 @@ import { buildMentionRegexes, matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; -import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; -import type { TypingController } from "../auto-reply/reply/typing.js"; +import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import type { ReplyToMode } from "../config/config.js"; import { loadConfig } from "../config/config.js"; @@ -451,42 +450,35 @@ export function createTelegramBot(opts: TelegramBotOptions) { ); } - let typingController: TypingController | undefined; - const dispatcher = createReplyDispatcher({ - responsePrefix: cfg.messages?.responsePrefix, - deliver: async (payload) => { - await deliverReplies({ - replies: [payload], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - replyToMode, - textLimit, - }); - }, - onIdle: () => { - typingController?.markDispatchIdle(); - }, - onError: (err, info) => { - runtime.error?.( - danger(`telegram ${info.kind} reply failed: ${String(err)}`), - ); - }, - }); + const { dispatcher, replyOptions, markDispatchIdle } = + createReplyDispatcherWithTyping({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + chatId: String(chatId), + token: opts.token, + runtime, + bot, + replyToMode, + textLimit, + }); + }, + onError: (err, info) => { + runtime.error?.( + danger(`telegram ${info.kind} reply failed: ${String(err)}`), + ); + }, + onReplyStart: sendTyping, + }); const { queuedFinal } = await dispatchReplyFromConfig({ ctx: ctxPayload, cfg, dispatcher, - replyOptions: { - onReplyStart: sendTyping, - onTypingController: (typing) => { - typingController = typing; - }, - }, + replyOptions, }); - typingController?.markDispatchIdle(); + markDispatchIdle(); if (!queuedFinal) return; }; diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 94fae1ffd..1618fd965 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -17,8 +17,7 @@ import { buildMentionRegexes, normalizeMentionText, } from "../auto-reply/reply/mentions.js"; -import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; -import type { TypingController } from "../auto-reply/reply/typing.js"; +import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; @@ -1161,72 +1160,70 @@ export async function monitorWebProvider( const textLimit = resolveTextChunkLimit(cfg, "whatsapp"); let didLogHeartbeatStrip = false; let didSendReply = false; - let typingController: TypingController | undefined; - const dispatcher = createReplyDispatcher({ - responsePrefix: cfg.messages?.responsePrefix, - onHeartbeatStrip: () => { - if (!didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose("Stripped stray HEARTBEAT_OK token from web reply"); - } - }, - deliver: async (payload, info) => { - await deliverWebReply({ - replyResult: payload, - msg, - maxMediaBytes, - textLimit, - replyLogger, - connectionId, - // Tool + block updates are noisy; skip their log lines. - skipLog: info.kind !== "final", - }); - didSendReply = true; - if (info.kind === "tool") { - rememberSentText(payload.text, { combinedBody: "" }); - return; - } - const shouldLog = - info.kind === "final" && payload.text ? true : undefined; - rememberSentText(payload.text, { - combinedBody, - logVerboseMessage: shouldLog, - }); - if (info.kind === "final") { - const fromDisplay = - msg.chatType === "group" - ? conversationId - : (msg.from ?? "unknown"); - const hasMedia = Boolean( - payload.mediaUrl || payload.mediaUrls?.length, - ); - whatsappOutboundLog.info( - `Auto-replied to ${fromDisplay}${hasMedia ? " (media)" : ""}`, - ); - if (shouldLogVerbose()) { - const preview = - payload.text != null ? elide(payload.text, 400) : ""; - whatsappOutboundLog.debug( - `Reply body: ${preview}${hasMedia ? " (media)" : ""}`, - ); + const { dispatcher, replyOptions, markDispatchIdle } = + createReplyDispatcherWithTyping({ + responsePrefix: cfg.messages?.responsePrefix, + onHeartbeatStrip: () => { + if (!didLogHeartbeatStrip) { + didLogHeartbeatStrip = true; + logVerbose("Stripped stray HEARTBEAT_OK token from web reply"); } - } - }, - onIdle: () => { - typingController?.markDispatchIdle(); - }, - onError: (err, info) => { - const label = - info.kind === "tool" - ? "tool update" - : info.kind === "block" - ? "block update" - : "auto-reply"; - whatsappOutboundLog.error( - `Failed sending web ${label} to ${msg.from ?? conversationId}: ${formatError(err)}`, - ); - }, - }); + }, + deliver: async (payload, info) => { + await deliverWebReply({ + replyResult: payload, + msg, + maxMediaBytes, + textLimit, + replyLogger, + connectionId, + // Tool + block updates are noisy; skip their log lines. + skipLog: info.kind !== "final", + }); + didSendReply = true; + if (info.kind === "tool") { + rememberSentText(payload.text, { combinedBody: "" }); + return; + } + const shouldLog = + info.kind === "final" && payload.text ? true : undefined; + rememberSentText(payload.text, { + combinedBody, + logVerboseMessage: shouldLog, + }); + if (info.kind === "final") { + const fromDisplay = + msg.chatType === "group" + ? conversationId + : (msg.from ?? "unknown"); + const hasMedia = Boolean( + payload.mediaUrl || payload.mediaUrls?.length, + ); + whatsappOutboundLog.info( + `Auto-replied to ${fromDisplay}${hasMedia ? " (media)" : ""}`, + ); + if (shouldLogVerbose()) { + const preview = + payload.text != null ? elide(payload.text, 400) : ""; + whatsappOutboundLog.debug( + `Reply body: ${preview}${hasMedia ? " (media)" : ""}`, + ); + } + } + }, + onError: (err, info) => { + const label = + info.kind === "tool" + ? "tool update" + : info.kind === "block" + ? "block update" + : "auto-reply"; + whatsappOutboundLog.error( + `Failed sending web ${label} to ${msg.from ?? conversationId}: ${formatError(err)}`, + ); + }, + onReplyStart: msg.sendComposing, + }); const { queuedFinal } = await dispatchReplyFromConfig({ ctx: { @@ -1258,14 +1255,9 @@ export async function monitorWebProvider( cfg, dispatcher, replyResolver, - replyOptions: { - onReplyStart: msg.sendComposing, - onTypingController: (typing) => { - typingController = typing; - }, - }, + replyOptions, }); - typingController?.markDispatchIdle(); + markDispatchIdle(); if (!queuedFinal) { if (shouldClearGroupHistory && didSendReply) { groupHistories.set(route.sessionKey, []);