fix(slack): clear assistant thread status after replies

This commit is contained in:
Peter Steinberger
2026-01-06 21:41:30 +01:00
parent 8ebc789d25
commit 84c8209158
6 changed files with 205 additions and 179 deletions

View File

@@ -1,6 +1,7 @@
import { stripHeartbeatToken } from "../heartbeat.js";
import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { ReplyPayload } from "../types.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { TypingController } from "./typing.js";
export type ReplyDispatchKind = "tool" | "block" | "final";
@@ -22,6 +23,20 @@ export type ReplyDispatcherOptions = {
onError?: ReplyDispatchErrorHandler;
};
type ReplyDispatcherWithTypingOptions = Omit<
ReplyDispatcherOptions,
"onIdle"
> & {
onReplyStart?: () => Promise<void> | void;
onIdle?: () => void;
};
type ReplyDispatcherWithTypingResult = {
dispatcher: ReplyDispatcher;
replyOptions: Pick<GetReplyOptions, "onReplyStart" | "onTypingController">;
markDispatchIdle: () => void;
};
export type ReplyDispatcher = {
sendToolResult: (payload: ReplyPayload) => boolean;
sendBlockReply: (payload: ReplyPayload) => boolean;
@@ -107,3 +122,31 @@ export function createReplyDispatcher(
getQueuedCounts: () => ({ ...queuedCounts }),
};
}
export function createReplyDispatcherWithTyping(
options: ReplyDispatcherWithTypingOptions,
): ReplyDispatcherWithTypingResult {
const { onReplyStart, onIdle, ...dispatcherOptions } = options;
let typingController: TypingController | undefined;
const dispatcher = createReplyDispatcher({
...dispatcherOptions,
onIdle: () => {
typingController?.markDispatchIdle();
onIdle?.();
},
});
return {
dispatcher,
replyOptions: {
onReplyStart,
onTypingController: (typing) => {
typingController = typing;
},
},
markDispatchIdle: () => {
typingController?.markDispatchIdle();
onIdle?.();
},
};
}

View File

@@ -33,8 +33,10 @@ import {
buildMentionRegexes,
matchesMentionPatterns,
} from "../auto-reply/reply/mentions.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import type { TypingController } from "../auto-reply/reply/typing.js";
import {
createReplyDispatcher,
createReplyDispatcherWithTyping,
} from "../auto-reply/reply/reply-dispatcher.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import type { ReplyToMode } from "../config/config.js";
@@ -797,43 +799,36 @@ export function createDiscordMessageHandler(params: {
}
let didSendReply = false;
let typingController: TypingController | undefined;
const dispatcher = createReplyDispatcher({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
await deliverDiscordReply({
replies: [payload],
target: replyTarget,
token,
rest: client.rest,
runtime,
replyToMode,
textLimit,
});
didSendReply = true;
},
onIdle: () => {
typingController?.markDispatchIdle();
},
onError: (err, info) => {
runtime.error?.(
danger(`discord ${info.kind} reply failed: ${String(err)}`),
);
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
createReplyDispatcherWithTyping({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
await deliverDiscordReply({
replies: [payload],
target: replyTarget,
token,
rest: client.rest,
runtime,
replyToMode,
textLimit,
});
didSendReply = true;
},
onError: (err, info) => {
runtime.error?.(
danger(`discord ${info.kind} reply failed: ${String(err)}`),
);
},
onReplyStart: () => sendTyping(message),
});
const { queuedFinal, counts } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
onReplyStart: () => sendTyping(message),
onTypingController: (typing) => {
typingController = typing;
},
},
replyOptions,
});
typingController?.markDispatchIdle();
markDispatchIdle();
if (!queuedFinal) {
if (
isGuildMessage &&

View File

@@ -189,12 +189,20 @@ describe("monitorSlackProvider tool results", () => {
const client = getSlackClient() as {
assistant?: { threads?: { setStatus?: ReturnType<typeof vi.fn> } };
};
expect(client.assistant?.threads?.setStatus).toHaveBeenCalledWith({
const setStatus = client.assistant?.threads?.setStatus;
expect(setStatus).toHaveBeenCalledTimes(2);
expect(setStatus).toHaveBeenNthCalledWith(1, {
token: "bot-token",
channel_id: "C1",
thread_ts: "123",
status: "is typing...",
});
expect(setStatus).toHaveBeenNthCalledWith(2, {
token: "bot-token",
channel_id: "C1",
thread_ts: "123",
status: "",
});
});
it("accepts channel messages when mentionPatterns match", async () => {

View File

@@ -19,8 +19,7 @@ import {
buildMentionRegexes,
matchesMentionPatterns,
} from "../auto-reply/reply/mentions.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import type { TypingController } from "../auto-reply/reply/typing.js";
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import type { ReplyPayload } from "../auto-reply/types.js";
@@ -860,61 +859,58 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
// Only thread replies if the incoming message was in a thread.
const incomingThreadTs = message.thread_ts;
const statusThreadTs = message.thread_ts ?? message.ts;
let didSetStatus = false;
const onReplyStart = async () => {
didSetStatus = true;
await setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "is typing...",
});
};
let typingController: TypingController | undefined;
const dispatcher = createReplyDispatcher({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
await deliverReplies({
replies: [payload],
target: replyTarget,
token: botToken,
runtime,
textLimit,
threadTs: incomingThreadTs,
});
},
onIdle: () => {
typingController?.markDispatchIdle();
},
onError: (err, info) => {
runtime.error?.(
danger(`slack ${info.kind} reply failed: ${String(err)}`),
);
void setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "",
});
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
createReplyDispatcherWithTyping({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
await deliverReplies({
replies: [payload],
target: replyTarget,
token: botToken,
runtime,
textLimit,
threadTs: incomingThreadTs,
});
},
onError: (err, info) => {
runtime.error?.(
danger(`slack ${info.kind} reply failed: ${String(err)}`),
);
if (didSetStatus) {
void setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "",
});
}
},
onReplyStart,
});
const { queuedFinal, counts } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
onReplyStart,
onTypingController: (typing) => {
typingController = typing;
},
},
replyOptions,
});
typingController?.markDispatchIdle();
if (!queuedFinal) {
markDispatchIdle();
if (didSetStatus) {
await setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "",
});
return;
}
if (!queuedFinal) return;
if (shouldLogVerbose()) {
const finalCount = counts.final;
logVerbose(

View File

@@ -19,8 +19,7 @@ import {
buildMentionRegexes,
matchesMentionPatterns,
} from "../auto-reply/reply/mentions.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import type { TypingController } from "../auto-reply/reply/typing.js";
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import type { ReplyToMode } from "../config/config.js";
import { loadConfig } from "../config/config.js";
@@ -451,42 +450,35 @@ export function createTelegramBot(opts: TelegramBotOptions) {
);
}
let typingController: TypingController | undefined;
const dispatcher = createReplyDispatcher({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
await deliverReplies({
replies: [payload],
chatId: String(chatId),
token: opts.token,
runtime,
bot,
replyToMode,
textLimit,
});
},
onIdle: () => {
typingController?.markDispatchIdle();
},
onError: (err, info) => {
runtime.error?.(
danger(`telegram ${info.kind} reply failed: ${String(err)}`),
);
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
createReplyDispatcherWithTyping({
responsePrefix: cfg.messages?.responsePrefix,
deliver: async (payload) => {
await deliverReplies({
replies: [payload],
chatId: String(chatId),
token: opts.token,
runtime,
bot,
replyToMode,
textLimit,
});
},
onError: (err, info) => {
runtime.error?.(
danger(`telegram ${info.kind} reply failed: ${String(err)}`),
);
},
onReplyStart: sendTyping,
});
const { queuedFinal } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
onReplyStart: sendTyping,
onTypingController: (typing) => {
typingController = typing;
},
},
replyOptions,
});
typingController?.markDispatchIdle();
markDispatchIdle();
if (!queuedFinal) return;
};

View File

@@ -17,8 +17,7 @@ import {
buildMentionRegexes,
normalizeMentionText,
} from "../auto-reply/reply/mentions.js";
import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js";
import type { TypingController } from "../auto-reply/reply/typing.js";
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import type { ReplyPayload } from "../auto-reply/types.js";
@@ -1161,72 +1160,70 @@ export async function monitorWebProvider(
const textLimit = resolveTextChunkLimit(cfg, "whatsapp");
let didLogHeartbeatStrip = false;
let didSendReply = false;
let typingController: TypingController | undefined;
const dispatcher = createReplyDispatcher({
responsePrefix: cfg.messages?.responsePrefix,
onHeartbeatStrip: () => {
if (!didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from web reply");
}
},
deliver: async (payload, info) => {
await deliverWebReply({
replyResult: payload,
msg,
maxMediaBytes,
textLimit,
replyLogger,
connectionId,
// Tool + block updates are noisy; skip their log lines.
skipLog: info.kind !== "final",
});
didSendReply = true;
if (info.kind === "tool") {
rememberSentText(payload.text, { combinedBody: "" });
return;
}
const shouldLog =
info.kind === "final" && payload.text ? true : undefined;
rememberSentText(payload.text, {
combinedBody,
logVerboseMessage: shouldLog,
});
if (info.kind === "final") {
const fromDisplay =
msg.chatType === "group"
? conversationId
: (msg.from ?? "unknown");
const hasMedia = Boolean(
payload.mediaUrl || payload.mediaUrls?.length,
);
whatsappOutboundLog.info(
`Auto-replied to ${fromDisplay}${hasMedia ? " (media)" : ""}`,
);
if (shouldLogVerbose()) {
const preview =
payload.text != null ? elide(payload.text, 400) : "<media>";
whatsappOutboundLog.debug(
`Reply body: ${preview}${hasMedia ? " (media)" : ""}`,
);
const { dispatcher, replyOptions, markDispatchIdle } =
createReplyDispatcherWithTyping({
responsePrefix: cfg.messages?.responsePrefix,
onHeartbeatStrip: () => {
if (!didLogHeartbeatStrip) {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from web reply");
}
}
},
onIdle: () => {
typingController?.markDispatchIdle();
},
onError: (err, info) => {
const label =
info.kind === "tool"
? "tool update"
: info.kind === "block"
? "block update"
: "auto-reply";
whatsappOutboundLog.error(
`Failed sending web ${label} to ${msg.from ?? conversationId}: ${formatError(err)}`,
);
},
});
},
deliver: async (payload, info) => {
await deliverWebReply({
replyResult: payload,
msg,
maxMediaBytes,
textLimit,
replyLogger,
connectionId,
// Tool + block updates are noisy; skip their log lines.
skipLog: info.kind !== "final",
});
didSendReply = true;
if (info.kind === "tool") {
rememberSentText(payload.text, { combinedBody: "" });
return;
}
const shouldLog =
info.kind === "final" && payload.text ? true : undefined;
rememberSentText(payload.text, {
combinedBody,
logVerboseMessage: shouldLog,
});
if (info.kind === "final") {
const fromDisplay =
msg.chatType === "group"
? conversationId
: (msg.from ?? "unknown");
const hasMedia = Boolean(
payload.mediaUrl || payload.mediaUrls?.length,
);
whatsappOutboundLog.info(
`Auto-replied to ${fromDisplay}${hasMedia ? " (media)" : ""}`,
);
if (shouldLogVerbose()) {
const preview =
payload.text != null ? elide(payload.text, 400) : "<media>";
whatsappOutboundLog.debug(
`Reply body: ${preview}${hasMedia ? " (media)" : ""}`,
);
}
}
},
onError: (err, info) => {
const label =
info.kind === "tool"
? "tool update"
: info.kind === "block"
? "block update"
: "auto-reply";
whatsappOutboundLog.error(
`Failed sending web ${label} to ${msg.from ?? conversationId}: ${formatError(err)}`,
);
},
onReplyStart: msg.sendComposing,
});
const { queuedFinal } = await dispatchReplyFromConfig({
ctx: {
@@ -1258,14 +1255,9 @@ export async function monitorWebProvider(
cfg,
dispatcher,
replyResolver,
replyOptions: {
onReplyStart: msg.sendComposing,
onTypingController: (typing) => {
typingController = typing;
},
},
replyOptions,
});
typingController?.markDispatchIdle();
markDispatchIdle();
if (!queuedFinal) {
if (shouldClearGroupHistory && didSendReply) {
groupHistories.set(route.sessionKey, []);