refactor: unify typing callbacks

This commit is contained in:
Peter Steinberger
2026-01-23 22:55:41 +00:00
parent d82ecaf9dc
commit 8252ae2da1
11 changed files with 114 additions and 63 deletions

View File

@@ -1,6 +1,7 @@
import type { LocationMessageEventContent, MatrixClient } from "matrix-bot-sdk"; import type { LocationMessageEventContent, MatrixClient } from "matrix-bot-sdk";
import { import {
createTypingCallbacks,
formatAllowlistMatchMeta, formatAllowlistMatchMeta,
type RuntimeEnv, type RuntimeEnv,
} from "clawdbot/plugin-sdk"; } from "clawdbot/plugin-sdk";
@@ -552,6 +553,16 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
channel: "matrix", channel: "matrix",
accountId: route.accountId, accountId: route.accountId,
}); });
const typingCallbacks = createTypingCallbacks({
start: () => sendTypingMatrix(roomId, true, undefined, client),
stop: () => sendTypingMatrix(roomId, false, undefined, client),
onStartError: (err) => {
logVerboseMessage(`matrix typing cue failed for room ${roomId}: ${String(err)}`);
},
onStopError: (err) => {
logVerboseMessage(`matrix typing stop failed for room ${roomId}: ${String(err)}`);
},
});
const { dispatcher, replyOptions, markDispatchIdle } = const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({ core.channel.reply.createReplyDispatcherWithTyping({
responsePrefix: core.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId) responsePrefix: core.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId)
@@ -574,10 +585,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
onError: (err, info) => { onError: (err, info) => {
runtime.error?.(`matrix ${info.kind} reply failed: ${String(err)}`); runtime.error?.(`matrix ${info.kind} reply failed: ${String(err)}`);
}, },
onReplyStart: () => onReplyStart: typingCallbacks.onReplyStart,
sendTypingMatrix(roomId, true, undefined, client).catch(() => {}), onIdle: typingCallbacks.onIdle,
onIdle: () =>
sendTypingMatrix(roomId, false, undefined, client).catch(() => {}),
}); });
const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({

View File

@@ -7,6 +7,7 @@ import type {
RuntimeEnv, RuntimeEnv,
} from "clawdbot/plugin-sdk"; } from "clawdbot/plugin-sdk";
import { import {
createTypingCallbacks,
buildPendingHistoryContextFromMap, buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled, clearHistoryEntriesIfEnabled,
DEFAULT_GROUP_HISTORY_LIMIT, DEFAULT_GROUP_HISTORY_LIMIT,
@@ -307,11 +308,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
}; };
const sendTypingIndicator = async (channelId: string, parentId?: string) => { const sendTypingIndicator = async (channelId: string, parentId?: string) => {
try { await sendMattermostTyping(client, { channelId, parentId });
await sendMattermostTyping(client, { channelId, parentId });
} catch (err) {
logger.debug?.(`mattermost typing cue failed for channel ${channelId}: ${String(err)}`);
}
}; };
const resolveChannelInfo = async (channelId: string): Promise<MattermostChannel | null> => { const resolveChannelInfo = async (channelId: string): Promise<MattermostChannel | null> => {
@@ -717,6 +714,12 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
identityName: resolveIdentityName(cfg, route.agentId), identityName: resolveIdentityName(cfg, route.agentId),
}; };
const typingCallbacks = createTypingCallbacks({
start: () => sendTypingIndicator(channelId, threadRootId),
onStartError: (err) => {
logger.debug?.(`mattermost typing cue failed for channel ${channelId}: ${String(err)}`);
},
});
const { dispatcher, replyOptions, markDispatchIdle } = const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({ core.channel.reply.createReplyDispatcherWithTyping({
responsePrefix: core.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId) responsePrefix: core.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId)
@@ -752,7 +755,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
onError: (err, info) => { onError: (err, info) => {
runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`); runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`);
}, },
onReplyStart: () => sendTypingIndicator(channelId, threadRootId), onReplyStart: typingCallbacks.onReplyStart,
}); });
await core.channel.reply.dispatchReplyFromConfig({ await core.channel.reply.dispatchReplyFromConfig({

View File

@@ -1,4 +1,5 @@
import { import {
createTypingCallbacks,
resolveChannelMediaMaxBytes, resolveChannelMediaMaxBytes,
type ClawdbotConfig, type ClawdbotConfig,
type MSTeamsReplyStyle, type MSTeamsReplyStyle,
@@ -39,12 +40,14 @@ export function createMSTeamsReplyDispatcher(params: {
}) { }) {
const core = getMSTeamsRuntime(); const core = getMSTeamsRuntime();
const sendTypingIndicator = async () => { const sendTypingIndicator = async () => {
try { await params.context.sendActivities([{ type: "typing" }]);
await params.context.sendActivities([{ type: "typing" }]);
} catch {
// Typing indicator is best-effort.
}
}; };
const typingCallbacks = createTypingCallbacks({
start: sendTypingIndicator,
onStartError: () => {
// Typing indicator is best-effort.
},
});
return core.channel.reply.createReplyDispatcherWithTyping({ return core.channel.reply.createReplyDispatcherWithTyping({
responsePrefix: core.channel.reply.resolveEffectiveMessagesConfig( responsePrefix: core.channel.reply.resolveEffectiveMessagesConfig(
@@ -102,6 +105,6 @@ export function createMSTeamsReplyDispatcher(params: {
hint, hint,
}); });
}, },
onReplyStart: sendTypingIndicator, onReplyStart: typingCallbacks.onReplyStart,
}); });
} }

27
src/channels/typing.ts Normal file
View File

@@ -0,0 +1,27 @@
export type TypingCallbacks = {
onReplyStart: () => Promise<void>;
onIdle?: () => void;
};
export function createTypingCallbacks(params: {
start: () => Promise<void>;
stop?: () => Promise<void>;
onStartError: (err: unknown) => void;
onStopError?: (err: unknown) => void;
}): TypingCallbacks {
const onReplyStart = async () => {
try {
await params.start();
} catch (err) {
params.onStartError(err);
}
};
const onIdle = params.stop
? () => {
void params.stop().catch((err) => (params.onStopError ?? params.onStartError)(err));
}
: undefined;
return { onReplyStart, onIdle };
}

View File

@@ -12,6 +12,7 @@ import {
removeAckReactionAfterReply, removeAckReactionAfterReply,
shouldAckReaction as shouldAckReactionGate, shouldAckReaction as shouldAckReactionGate,
} from "../../channels/ack-reactions.js"; } from "../../channels/ack-reactions.js";
import { createTypingCallbacks } from "../../channels/typing.js";
import { import {
formatInboundEnvelope, formatInboundEnvelope,
formatThreadStarterEnvelope, formatThreadStarterEnvelope,
@@ -350,7 +351,12 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
onError: (err, info) => { onError: (err, info) => {
runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`)); runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`));
}, },
onReplyStart: () => sendTyping({ client, channelId: typingChannelId }), onReplyStart: createTypingCallbacks({
start: () => sendTyping({ client, channelId: typingChannelId }),
onStartError: (err) => {
logVerbose(`discord typing cue failed for channel ${typingChannelId}: ${String(err)}`);
},
}).onReplyStart,
}); });
const { queuedFinal, counts } = await dispatchInboundMessage({ const { queuedFinal, counts } = await dispatchInboundMessage({

View File

@@ -1,15 +1,9 @@
import type { Client } from "@buape/carbon"; import type { Client } from "@buape/carbon";
import { logVerbose } from "../../globals.js";
export async function sendTyping(params: { client: Client; channelId: string }) { export async function sendTyping(params: { client: Client; channelId: string }) {
try { const channel = await params.client.fetchChannel(params.channelId);
const channel = await params.client.fetchChannel(params.channelId); if (!channel) return;
if (!channel) return; if ("triggerTyping" in channel && typeof channel.triggerTyping === "function") {
if ("triggerTyping" in channel && typeof channel.triggerTyping === "function") { await channel.triggerTyping();
await channel.triggerTyping();
}
} catch (err) {
logVerbose(`discord typing cue failed for channel ${params.channelId}: ${String(err)}`);
} }
} }

View File

@@ -129,6 +129,7 @@ export {
shouldAckReaction, shouldAckReaction,
shouldAckReactionForWhatsApp, shouldAckReactionForWhatsApp,
} from "../channels/ack-reactions.js"; } from "../channels/ack-reactions.js";
export { createTypingCallbacks } from "../channels/typing.js";
export { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js"; export { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js";
export type { NormalizedLocation } from "../channels/location.js"; export type { NormalizedLocation } from "../channels/location.js";
export { formatLocationText, toLocationContext } from "../channels/location.js"; export { formatLocationText, toLocationContext } from "../channels/location.js";

View File

@@ -25,6 +25,7 @@ import {
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import { recordInboundSession } from "../../channels/session.js"; import { recordInboundSession } from "../../channels/session.js";
import { createTypingCallbacks } from "../../channels/typing.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { enqueueSystemEvent } from "../../infra/system-events.js"; import { enqueueSystemEvent } from "../../infra/system-events.js";
@@ -182,18 +183,19 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
identityName: resolveIdentityName(deps.cfg, route.agentId), identityName: resolveIdentityName(deps.cfg, route.agentId),
}; };
const onReplyStart = async () => { const typingCallbacks = createTypingCallbacks({
try { start: async () => {
if (!ctxPayload.To) return; if (!ctxPayload.To) return;
await sendTypingSignal(ctxPayload.To, { await sendTypingSignal(ctxPayload.To, {
baseUrl: deps.baseUrl, baseUrl: deps.baseUrl,
account: deps.account, account: deps.account,
accountId: deps.accountId, accountId: deps.accountId,
}); });
} catch (err) { },
onStartError: (err) => {
logVerbose(`signal typing cue failed for ${ctxPayload.To}: ${String(err)}`); logVerbose(`signal typing cue failed for ${ctxPayload.To}: ${String(err)}`);
} },
}; });
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
responsePrefix: resolveEffectiveMessagesConfig(deps.cfg, route.agentId).responsePrefix, responsePrefix: resolveEffectiveMessagesConfig(deps.cfg, route.agentId).responsePrefix,
@@ -214,7 +216,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
onError: (err, info) => { onError: (err, info) => {
deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`)); deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`));
}, },
onReplyStart, onReplyStart: typingCallbacks.onReplyStart,
}); });
const { queuedFinal } = await dispatchInboundMessage({ const { queuedFinal } = await dispatchInboundMessage({

View File

@@ -10,6 +10,7 @@ import {
import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js"; import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js";
import { clearHistoryEntriesIfEnabled } from "../../../auto-reply/reply/history.js"; import { clearHistoryEntriesIfEnabled } from "../../../auto-reply/reply/history.js";
import { removeAckReactionAfterReply } from "../../../channels/ack-reactions.js"; import { removeAckReactionAfterReply } from "../../../channels/ack-reactions.js";
import { createTypingCallbacks } from "../../../channels/typing.js";
import { createReplyDispatcherWithTyping } from "../../../auto-reply/reply/reply-dispatcher.js"; import { createReplyDispatcherWithTyping } from "../../../auto-reply/reply/reply-dispatcher.js";
import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js"; import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js";
import { removeSlackReaction } from "../../actions.js"; import { removeSlackReaction } from "../../actions.js";
@@ -43,14 +44,30 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
hasRepliedRef, hasRepliedRef,
}); });
const onReplyStart = async () => { const typingCallbacks = createTypingCallbacks({
didSetStatus = true; start: async () => {
await ctx.setSlackThreadStatus({ didSetStatus = true;
channelId: message.channel, await ctx.setSlackThreadStatus({
threadTs: statusThreadTs, channelId: message.channel,
status: "is typing...", threadTs: statusThreadTs,
}); status: "is typing...",
}; });
},
stop: async () => {
if (!didSetStatus) return;
await ctx.setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "",
});
},
onStartError: (err) => {
runtime.error?.(danger(`slack typing cue failed: ${String(err)}`));
},
onStopError: (err) => {
runtime.error?.(danger(`slack typing stop failed: ${String(err)}`));
},
});
// Create mutable context for response prefix template interpolation // Create mutable context for response prefix template interpolation
let prefixContext: ResponsePrefixContext = { let prefixContext: ResponsePrefixContext = {
@@ -76,15 +93,10 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
}, },
onError: (err, info) => { onError: (err, info) => {
runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`));
if (didSetStatus) { typingCallbacks.onIdle?.();
void ctx.setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "",
});
}
}, },
onReplyStart, onReplyStart: typingCallbacks.onReplyStart,
onIdle: typingCallbacks.onIdle,
}); });
const { queuedFinal, counts } = await dispatchInboundMessage({ const { queuedFinal, counts } = await dispatchInboundMessage({
@@ -110,14 +122,6 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
}); });
markDispatchIdle(); markDispatchIdle();
if (didSetStatus) {
await ctx.setSlackThreadStatus({
channelId: message.channel,
threadTs: statusThreadTs,
status: "",
});
}
if (!queuedFinal) { if (!queuedFinal) {
if (prepared.isRoomish) { if (prepared.isRoomish) {
clearHistoryEntriesIfEnabled({ clearHistoryEntriesIfEnabled({

View File

@@ -156,11 +156,7 @@ export const buildTelegramMessageContext = async ({
} }
const sendTyping = async () => { const sendTyping = async () => {
try { await bot.api.sendChatAction(chatId, "typing", buildTypingThreadParams(resolvedThreadId));
await bot.api.sendChatAction(chatId, "typing", buildTypingThreadParams(resolvedThreadId));
} catch (err) {
logVerbose(`telegram typing cue failed for chat ${chatId}: ${String(err)}`);
}
}; };
const sendRecordVoice = async () => { const sendRecordVoice = async () => {

View File

@@ -8,6 +8,7 @@ import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js";
import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js"; import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js";
import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js";
import { removeAckReactionAfterReply } from "../channels/ack-reactions.js"; import { removeAckReactionAfterReply } from "../channels/ack-reactions.js";
import { createTypingCallbacks } from "../channels/typing.js";
import { danger, logVerbose } from "../globals.js"; import { danger, logVerbose } from "../globals.js";
import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
import { deliverReplies } from "./bot/delivery.js"; import { deliverReplies } from "./bot/delivery.js";
@@ -158,7 +159,12 @@ export const dispatchTelegramMessage = async ({
onError: (err, info) => { onError: (err, info) => {
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
}, },
onReplyStart: sendTyping, onReplyStart: createTypingCallbacks({
start: sendTyping,
onStartError: (err) => {
logVerbose(`telegram typing cue failed for chat ${chatId}: ${String(err)}`);
},
}).onReplyStart,
}, },
replyOptions: { replyOptions: {
skillFilter, skillFilter,