From 8252ae2da10d3c630cf2fdfc06e3895f5d0c17f0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 23 Jan 2026 22:55:41 +0000 Subject: [PATCH] refactor: unify typing callbacks --- .../matrix/src/matrix/monitor/handler.ts | 17 ++++-- .../mattermost/src/mattermost/monitor.ts | 15 +++--- extensions/msteams/src/reply-dispatcher.ts | 15 +++--- src/channels/typing.ts | 27 ++++++++++ .../monitor/message-handler.process.ts | 8 ++- src/discord/monitor/typing.ts | 14 ++--- src/plugin-sdk/index.ts | 1 + src/signal/monitor/event-handler.ts | 14 ++--- src/slack/monitor/message-handler/dispatch.ts | 52 ++++++++++--------- src/telegram/bot-message-context.ts | 6 +-- src/telegram/bot-message-dispatch.ts | 8 ++- 11 files changed, 114 insertions(+), 63 deletions(-) create mode 100644 src/channels/typing.ts diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index c1b46ffd3..878e3e47c 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1,6 +1,7 @@ import type { LocationMessageEventContent, MatrixClient } from "matrix-bot-sdk"; import { + createTypingCallbacks, formatAllowlistMatchMeta, type RuntimeEnv, } from "clawdbot/plugin-sdk"; @@ -552,6 +553,16 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam channel: "matrix", accountId: route.accountId, }); + const typingCallbacks = createTypingCallbacks({ + start: () => sendTypingMatrix(roomId, true, undefined, client), + stop: () => sendTypingMatrix(roomId, false, undefined, client), + onStartError: (err) => { + logVerboseMessage(`matrix typing cue failed for room ${roomId}: ${String(err)}`); + }, + onStopError: (err) => { + logVerboseMessage(`matrix typing stop failed for room ${roomId}: ${String(err)}`); + }, + }); const { dispatcher, replyOptions, markDispatchIdle } = core.channel.reply.createReplyDispatcherWithTyping({ responsePrefix: core.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId) @@ -574,10 +585,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam onError: (err, info) => { runtime.error?.(`matrix ${info.kind} reply failed: ${String(err)}`); }, - onReplyStart: () => - sendTypingMatrix(roomId, true, undefined, client).catch(() => {}), - onIdle: () => - sendTypingMatrix(roomId, false, undefined, client).catch(() => {}), + onReplyStart: typingCallbacks.onReplyStart, + onIdle: typingCallbacks.onIdle, }); const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 245388d68..ba080294f 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -7,6 +7,7 @@ import type { RuntimeEnv, } from "clawdbot/plugin-sdk"; import { + createTypingCallbacks, buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, DEFAULT_GROUP_HISTORY_LIMIT, @@ -307,11 +308,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }; const sendTypingIndicator = async (channelId: string, parentId?: string) => { - try { - await sendMattermostTyping(client, { channelId, parentId }); - } catch (err) { - logger.debug?.(`mattermost typing cue failed for channel ${channelId}: ${String(err)}`); - } + await sendMattermostTyping(client, { channelId, parentId }); }; const resolveChannelInfo = async (channelId: string): Promise => { @@ -717,6 +714,12 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} identityName: resolveIdentityName(cfg, route.agentId), }; + const typingCallbacks = createTypingCallbacks({ + start: () => sendTypingIndicator(channelId, threadRootId), + onStartError: (err) => { + logger.debug?.(`mattermost typing cue failed for channel ${channelId}: ${String(err)}`); + }, + }); const { dispatcher, replyOptions, markDispatchIdle } = core.channel.reply.createReplyDispatcherWithTyping({ responsePrefix: core.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId) @@ -752,7 +755,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} onError: (err, info) => { runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`); }, - onReplyStart: () => sendTypingIndicator(channelId, threadRootId), + onReplyStart: typingCallbacks.onReplyStart, }); await core.channel.reply.dispatchReplyFromConfig({ diff --git a/extensions/msteams/src/reply-dispatcher.ts b/extensions/msteams/src/reply-dispatcher.ts index f711c8240..83adffb7b 100644 --- a/extensions/msteams/src/reply-dispatcher.ts +++ b/extensions/msteams/src/reply-dispatcher.ts @@ -1,4 +1,5 @@ import { + createTypingCallbacks, resolveChannelMediaMaxBytes, type ClawdbotConfig, type MSTeamsReplyStyle, @@ -39,12 +40,14 @@ export function createMSTeamsReplyDispatcher(params: { }) { const core = getMSTeamsRuntime(); const sendTypingIndicator = async () => { - try { - await params.context.sendActivities([{ type: "typing" }]); - } catch { - // Typing indicator is best-effort. - } + await params.context.sendActivities([{ type: "typing" }]); }; + const typingCallbacks = createTypingCallbacks({ + start: sendTypingIndicator, + onStartError: () => { + // Typing indicator is best-effort. + }, + }); return core.channel.reply.createReplyDispatcherWithTyping({ responsePrefix: core.channel.reply.resolveEffectiveMessagesConfig( @@ -102,6 +105,6 @@ export function createMSTeamsReplyDispatcher(params: { hint, }); }, - onReplyStart: sendTypingIndicator, + onReplyStart: typingCallbacks.onReplyStart, }); } diff --git a/src/channels/typing.ts b/src/channels/typing.ts new file mode 100644 index 000000000..9c47fad89 --- /dev/null +++ b/src/channels/typing.ts @@ -0,0 +1,27 @@ +export type TypingCallbacks = { + onReplyStart: () => Promise; + onIdle?: () => void; +}; + +export function createTypingCallbacks(params: { + start: () => Promise; + stop?: () => Promise; + onStartError: (err: unknown) => void; + onStopError?: (err: unknown) => void; +}): TypingCallbacks { + const onReplyStart = async () => { + try { + await params.start(); + } catch (err) { + params.onStartError(err); + } + }; + + const onIdle = params.stop + ? () => { + void params.stop().catch((err) => (params.onStopError ?? params.onStartError)(err)); + } + : undefined; + + return { onReplyStart, onIdle }; +} diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 09593db95..b94d860be 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -12,6 +12,7 @@ import { removeAckReactionAfterReply, shouldAckReaction as shouldAckReactionGate, } from "../../channels/ack-reactions.js"; +import { createTypingCallbacks } from "../../channels/typing.js"; import { formatInboundEnvelope, formatThreadStarterEnvelope, @@ -350,7 +351,12 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) onError: (err, info) => { runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`)); }, - onReplyStart: () => sendTyping({ client, channelId: typingChannelId }), + onReplyStart: createTypingCallbacks({ + start: () => sendTyping({ client, channelId: typingChannelId }), + onStartError: (err) => { + logVerbose(`discord typing cue failed for channel ${typingChannelId}: ${String(err)}`); + }, + }).onReplyStart, }); const { queuedFinal, counts } = await dispatchInboundMessage({ diff --git a/src/discord/monitor/typing.ts b/src/discord/monitor/typing.ts index f1ae61fdc..e9ce734d4 100644 --- a/src/discord/monitor/typing.ts +++ b/src/discord/monitor/typing.ts @@ -1,15 +1,9 @@ import type { Client } from "@buape/carbon"; -import { logVerbose } from "../../globals.js"; - export async function sendTyping(params: { client: Client; channelId: string }) { - try { - const channel = await params.client.fetchChannel(params.channelId); - if (!channel) return; - if ("triggerTyping" in channel && typeof channel.triggerTyping === "function") { - await channel.triggerTyping(); - } - } catch (err) { - logVerbose(`discord typing cue failed for channel ${params.channelId}: ${String(err)}`); + const channel = await params.client.fetchChannel(params.channelId); + if (!channel) return; + if ("triggerTyping" in channel && typeof channel.triggerTyping === "function") { + await channel.triggerTyping(); } } diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index d02187d90..7b2d2d43f 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -129,6 +129,7 @@ export { shouldAckReaction, shouldAckReactionForWhatsApp, } from "../channels/ack-reactions.js"; +export { createTypingCallbacks } from "../channels/typing.js"; export { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js"; export type { NormalizedLocation } from "../channels/location.js"; export { formatLocationText, toLocationContext } from "../channels/location.js"; diff --git a/src/signal/monitor/event-handler.ts b/src/signal/monitor/event-handler.ts index c24ed0f90..802252487 100644 --- a/src/signal/monitor/event-handler.ts +++ b/src/signal/monitor/event-handler.ts @@ -25,6 +25,7 @@ import { import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; import { recordInboundSession } from "../../channels/session.js"; +import { createTypingCallbacks } from "../../channels/typing.js"; import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; @@ -182,18 +183,19 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { identityName: resolveIdentityName(deps.cfg, route.agentId), }; - const onReplyStart = async () => { - try { + const typingCallbacks = createTypingCallbacks({ + start: async () => { if (!ctxPayload.To) return; await sendTypingSignal(ctxPayload.To, { baseUrl: deps.baseUrl, account: deps.account, accountId: deps.accountId, }); - } catch (err) { + }, + onStartError: (err) => { logVerbose(`signal typing cue failed for ${ctxPayload.To}: ${String(err)}`); - } - }; + }, + }); const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ responsePrefix: resolveEffectiveMessagesConfig(deps.cfg, route.agentId).responsePrefix, @@ -214,7 +216,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { onError: (err, info) => { deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`)); }, - onReplyStart, + onReplyStart: typingCallbacks.onReplyStart, }); const { queuedFinal } = await dispatchInboundMessage({ diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 80beee623..a846cd128 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -10,6 +10,7 @@ import { import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js"; import { clearHistoryEntriesIfEnabled } from "../../../auto-reply/reply/history.js"; import { removeAckReactionAfterReply } from "../../../channels/ack-reactions.js"; +import { createTypingCallbacks } from "../../../channels/typing.js"; import { createReplyDispatcherWithTyping } from "../../../auto-reply/reply/reply-dispatcher.js"; import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js"; import { removeSlackReaction } from "../../actions.js"; @@ -43,14 +44,30 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag hasRepliedRef, }); - const onReplyStart = async () => { - didSetStatus = true; - await ctx.setSlackThreadStatus({ - channelId: message.channel, - threadTs: statusThreadTs, - status: "is typing...", - }); - }; + const typingCallbacks = createTypingCallbacks({ + start: async () => { + didSetStatus = true; + await ctx.setSlackThreadStatus({ + channelId: message.channel, + threadTs: statusThreadTs, + status: "is typing...", + }); + }, + stop: async () => { + if (!didSetStatus) return; + await ctx.setSlackThreadStatus({ + channelId: message.channel, + threadTs: statusThreadTs, + status: "", + }); + }, + onStartError: (err) => { + runtime.error?.(danger(`slack typing cue failed: ${String(err)}`)); + }, + onStopError: (err) => { + runtime.error?.(danger(`slack typing stop failed: ${String(err)}`)); + }, + }); // Create mutable context for response prefix template interpolation let prefixContext: ResponsePrefixContext = { @@ -76,15 +93,10 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag }, onError: (err, info) => { runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); - if (didSetStatus) { - void ctx.setSlackThreadStatus({ - channelId: message.channel, - threadTs: statusThreadTs, - status: "", - }); - } + typingCallbacks.onIdle?.(); }, - onReplyStart, + onReplyStart: typingCallbacks.onReplyStart, + onIdle: typingCallbacks.onIdle, }); const { queuedFinal, counts } = await dispatchInboundMessage({ @@ -110,14 +122,6 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag }); markDispatchIdle(); - if (didSetStatus) { - await ctx.setSlackThreadStatus({ - channelId: message.channel, - threadTs: statusThreadTs, - status: "", - }); - } - if (!queuedFinal) { if (prepared.isRoomish) { clearHistoryEntriesIfEnabled({ diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index c9a2d52ce..597a7cc79 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -156,11 +156,7 @@ export const buildTelegramMessageContext = async ({ } const sendTyping = async () => { - try { - await bot.api.sendChatAction(chatId, "typing", buildTypingThreadParams(resolvedThreadId)); - } catch (err) { - logVerbose(`telegram typing cue failed for chat ${chatId}: ${String(err)}`); - } + await bot.api.sendChatAction(chatId, "typing", buildTypingThreadParams(resolvedThreadId)); }; const sendRecordVoice = async () => { diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index c48513b79..784bcb10e 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -8,6 +8,7 @@ import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js"; import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { removeAckReactionAfterReply } from "../channels/ack-reactions.js"; +import { createTypingCallbacks } from "../channels/typing.js"; import { danger, logVerbose } from "../globals.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; import { deliverReplies } from "./bot/delivery.js"; @@ -158,7 +159,12 @@ export const dispatchTelegramMessage = async ({ onError: (err, info) => { runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); }, - onReplyStart: sendTyping, + onReplyStart: createTypingCallbacks({ + start: sendTyping, + onStartError: (err) => { + logVerbose(`telegram typing cue failed for chat ${chatId}: ${String(err)}`); + }, + }).onReplyStart, }, replyOptions: { skillFilter,