From d82ecaf9dc89dabd95258c44732d3d1d22a2d785 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 23 Jan 2026 22:48:03 +0000 Subject: [PATCH] refactor: centralize inbound session updates --- extensions/bluebubbles/src/monitor.test.ts | 1 + .../matrix/src/matrix/monitor/handler.ts | 34 ++++++------- .../src/monitor-handler/message-handler.ts | 9 ++-- extensions/nextcloud-talk/src/inbound.ts | 15 +++--- extensions/zalo/src/monitor.ts | 7 +-- extensions/zalouser/src/monitor.ts | 7 +-- src/channels/session.ts | 49 +++++++++++++++++++ .../monitor/message-handler.process.ts | 36 ++++++-------- src/imessage/monitor/monitor-provider.ts | 41 ++++++---------- src/plugin-sdk/index.ts | 1 + src/plugins/runtime/index.ts | 2 + src/plugins/runtime/types.ts | 2 + src/signal/monitor/event-handler.ts | 36 ++++++-------- src/slack/monitor/message-handler/dispatch.ts | 18 ------- src/slack/monitor/message-handler/prepare.ts | 38 ++++++++------ src/telegram/bot-message-context.ts | 36 ++++++-------- 16 files changed, 170 insertions(+), 162 deletions(-) create mode 100644 src/channels/session.ts diff --git a/extensions/bluebubbles/src/monitor.test.ts b/extensions/bluebubbles/src/monitor.test.ts index c960b5c4e..0f9973de9 100644 --- a/extensions/bluebubbles/src/monitor.test.ts +++ b/extensions/bluebubbles/src/monitor.test.ts @@ -129,6 +129,7 @@ function createMockRuntime(): PluginRuntime { session: { resolveStorePath: mockResolveStorePath as unknown as PluginRuntime["channel"]["session"]["resolveStorePath"], readSessionUpdatedAt: mockReadSessionUpdatedAt as unknown as PluginRuntime["channel"]["session"]["readSessionUpdatedAt"], + recordInboundSession: vi.fn() as unknown as PluginRuntime["channel"]["session"]["recordInboundSession"], recordSessionMetaFromInbound: vi.fn() as unknown as PluginRuntime["channel"]["session"]["recordSessionMetaFromInbound"], updateLastRoute: vi.fn() as unknown as PluginRuntime["channel"]["session"]["updateLastRoute"], }, diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 10db2be20..c1b46ffd3 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -487,29 +487,25 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam OriginatingTo: `room:${roomId}`, }); - void core.channel.session - .recordSessionMetaFromInbound({ - storePath, - sessionKey: ctxPayload.SessionKey ?? route.sessionKey, - ctx: ctxPayload, - }) - .catch((err) => { + await core.channel.session.recordInboundSession({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + updateLastRoute: isDirectMessage + ? { + sessionKey: route.mainSessionKey, + channel: "matrix", + to: `room:${roomId}`, + accountId: route.accountId, + } + : undefined, + onRecordError: (err) => { logger.warn( { error: String(err), storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey }, "failed updating session meta", ); - }); - - if (isDirectMessage) { - await core.channel.session.updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - channel: "matrix", - to: `room:${roomId}`, - accountId: route.accountId, - ctx: ctxPayload, - }); - } + }, + }); const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n"); logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`); diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index 79006ad70..715b6adf0 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -465,12 +465,13 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { ...mediaPayload, }); - void core.channel.session.recordSessionMetaFromInbound({ - storePath, + await core.channel.session.recordInboundSession({ + storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, - }).catch((err) => { - logVerboseMessage(`msteams: failed updating session meta: ${String(err)}`); + onRecordError: (err) => { + logVerboseMessage(`msteams: failed updating session meta: ${String(err)}`); + }, }); logVerboseMessage(`msteams inbound: from=${ctxPayload.From} preview="${preview}"`); diff --git a/extensions/nextcloud-talk/src/inbound.ts b/extensions/nextcloud-talk/src/inbound.ts index 1c6984848..bfa18d834 100644 --- a/extensions/nextcloud-talk/src/inbound.ts +++ b/extensions/nextcloud-talk/src/inbound.ts @@ -287,15 +287,14 @@ export async function handleNextcloudTalkInbound(params: { CommandAuthorized: commandAuthorized, }); - void core.channel.session - .recordSessionMetaFromInbound({ - storePath, - sessionKey: ctxPayload.SessionKey ?? route.sessionKey, - ctx: ctxPayload, - }) - .catch((err) => { + await core.channel.session.recordInboundSession({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + onRecordError: (err) => { runtime.error?.(`nextcloud-talk: failed updating session meta: ${String(err)}`); - }); + }, + }); await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index 939dcdbde..44a279354 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -570,12 +570,13 @@ async function processMessageWithPipeline(params: { OriginatingTo: `zalo:${chatId}`, }); - void core.channel.session.recordSessionMetaFromInbound({ + await core.channel.session.recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, - }).catch((err) => { - runtime.error?.(`zalo: failed updating session meta: ${String(err)}`); + onRecordError: (err) => { + runtime.error?.(`zalo: failed updating session meta: ${String(err)}`); + }, }); const tableMode = core.channel.text.resolveMarkdownTableMode({ diff --git a/extensions/zalouser/src/monitor.ts b/extensions/zalouser/src/monitor.ts index 4015fcc8d..97e5a4be3 100644 --- a/extensions/zalouser/src/monitor.ts +++ b/extensions/zalouser/src/monitor.ts @@ -311,12 +311,13 @@ async function processMessage( OriginatingTo: `zalouser:${chatId}`, }); - void core.channel.session.recordSessionMetaFromInbound({ + await core.channel.session.recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, - }).catch((err) => { - runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`); + onRecordError: (err) => { + runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`); + }, }); await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ diff --git a/src/channels/session.ts b/src/channels/session.ts new file mode 100644 index 000000000..2d34d7f74 --- /dev/null +++ b/src/channels/session.ts @@ -0,0 +1,49 @@ +import type { MsgContext } from "../auto-reply/templating.js"; +import { + recordSessionMetaFromInbound, + type GroupKeyResolution, + type SessionEntry, + updateLastRoute, +} from "../config/sessions.js"; + +export type InboundLastRouteUpdate = { + sessionKey: string; + channel: SessionEntry["lastChannel"]; + to: string; + accountId?: string; + threadId?: string | number; +}; + +export async function recordInboundSession(params: { + storePath: string; + sessionKey: string; + ctx: MsgContext; + groupResolution?: GroupKeyResolution | null; + createIfMissing?: boolean; + updateLastRoute?: InboundLastRouteUpdate; + onRecordError: (err: unknown) => void; +}): Promise { + const { storePath, sessionKey, ctx, groupResolution, createIfMissing } = params; + void recordSessionMetaFromInbound({ + storePath, + sessionKey, + ctx, + groupResolution, + createIfMissing, + }).catch(params.onRecordError); + + const update = params.updateLastRoute; + if (!update) return; + await updateLastRoute({ + storePath, + sessionKey: update.sessionKey, + deliveryContext: { + channel: update.channel, + to: update.to, + accountId: update.accountId, + threadId: update.threadId, + }, + ctx, + groupResolution, + }); +} diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index ce5212d9c..09593db95 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -25,12 +25,8 @@ import { import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; -import { - readSessionUpdatedAt, - recordSessionMetaFromInbound, - resolveStorePath, - updateLastRoute, -} from "../../config/sessions.js"; +import { recordInboundSession } from "../../channels/session.js"; +import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; import { resolveMarkdownTableMode } from "../../config/markdown-tables.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { buildAgentSessionKey } from "../../routing/resolve-route.js"; @@ -293,27 +289,23 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget, }); - void recordSessionMetaFromInbound({ + await recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, - }).catch((err) => { - logVerbose(`discord: failed updating session meta: ${String(err)}`); + updateLastRoute: isDirectMessage + ? { + sessionKey: route.mainSessionKey, + channel: "discord", + to: `user:${author.id}`, + accountId: route.accountId, + } + : undefined, + onRecordError: (err) => { + logVerbose(`discord: failed updating session meta: ${String(err)}`); + }, }); - if (isDirectMessage) { - await updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - deliveryContext: { - channel: "discord", - to: `user:${author.id}`, - accountId: route.accountId, - }, - ctx: ctxPayload, - }); - } - if (shouldLogVerbose()) { const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n"); logVerbose( diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index ab542ad64..576218416 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -31,17 +31,13 @@ import { } from "../../auto-reply/reply/history.js"; import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; +import { recordInboundSession } from "../../channels/session.js"; import { loadConfig } from "../../config/config.js"; import { resolveChannelGroupPolicy, resolveChannelGroupRequireMention, } from "../../config/group-policy.js"; -import { - readSessionUpdatedAt, - recordSessionMetaFromInbound, - resolveStorePath, - updateLastRoute, -} from "../../config/sessions.js"; +import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { waitForTransportReady } from "../../infra/transport-ready.js"; import { mediaKindFromMime } from "../../media/constants.js"; @@ -509,30 +505,25 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P OriginatingTo: imessageTo, }); - void recordSessionMetaFromInbound({ + const updateTarget = (isGroup ? chatTarget : undefined) || sender; + await recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, - }).catch((err) => { - logVerbose(`imessage: failed updating session meta: ${String(err)}`); + updateLastRoute: + !isGroup && updateTarget + ? { + sessionKey: route.mainSessionKey, + channel: "imessage", + to: updateTarget, + accountId: route.accountId, + } + : undefined, + onRecordError: (err) => { + logVerbose(`imessage: failed updating session meta: ${String(err)}`); + }, }); - if (!isGroup) { - const to = (isGroup ? chatTarget : undefined) || sender; - if (to) { - await updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - deliveryContext: { - channel: "imessage", - to, - accountId: route.accountId, - }, - ctx: ctxPayload, - }); - } - } - if (shouldLogVerbose()) { const preview = truncateUtf16Safe(body, 200).replace(/\n/g, "\\n"); logVerbose( diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index b349df7c2..d02187d90 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -140,6 +140,7 @@ export { resolveTelegramGroupRequireMention, resolveWhatsAppGroupRequireMention, } from "../channels/plugins/group-mentions.js"; +export { recordInboundSession } from "../channels/session.js"; export { buildChannelKeyCandidates, normalizeChannelSlug, diff --git a/src/plugins/runtime/index.ts b/src/plugins/runtime/index.ts index 504d5f034..534d3361b 100644 --- a/src/plugins/runtime/index.ts +++ b/src/plugins/runtime/index.ts @@ -27,6 +27,7 @@ import { handleSlackAction } from "../../agents/tools/slack-actions.js"; import { handleWhatsAppAction } from "../../agents/tools/whatsapp-actions.js"; import { removeAckReactionAfterReply, shouldAckReaction } from "../../channels/ack-reactions.js"; import { resolveCommandAuthorizedFromAuthorizers } from "../../channels/command-gating.js"; +import { recordInboundSession } from "../../channels/session.js"; import { discordMessageActions } from "../../channels/plugins/actions/discord.js"; import { telegramMessageActions } from "../../channels/plugins/actions/telegram.js"; import { createWhatsAppLoginTool } from "../../channels/plugins/agent-tools/whatsapp-login.js"; @@ -193,6 +194,7 @@ export function createPluginRuntime(): PluginRuntime { resolveStorePath, readSessionUpdatedAt, recordSessionMetaFromInbound, + recordInboundSession, updateLastRoute, }, mentions: { diff --git a/src/plugins/runtime/types.ts b/src/plugins/runtime/types.ts index 7351bc8da..3cda8ee51 100644 --- a/src/plugins/runtime/types.ts +++ b/src/plugins/runtime/types.ts @@ -54,6 +54,7 @@ type FormatInboundEnvelope = typeof import("../../auto-reply/envelope.js").forma type ResolveEnvelopeFormatOptions = typeof import("../../auto-reply/envelope.js").resolveEnvelopeFormatOptions; type ResolveStateDir = typeof import("../../config/paths.js").resolveStateDir; +type RecordInboundSession = typeof import("../../channels/session.js").recordInboundSession; type RecordSessionMetaFromInbound = typeof import("../../config/sessions.js").recordSessionMetaFromInbound; type ResolveStorePath = typeof import("../../config/sessions.js").resolveStorePath; @@ -208,6 +209,7 @@ export type PluginRuntime = { resolveStorePath: ResolveStorePath; readSessionUpdatedAt: ReadSessionUpdatedAt; recordSessionMetaFromInbound: RecordSessionMetaFromInbound; + recordInboundSession: RecordInboundSession; updateLastRoute: UpdateLastRoute; }; mentions: { diff --git a/src/signal/monitor/event-handler.ts b/src/signal/monitor/event-handler.ts index 01c1cc727..c24ed0f90 100644 --- a/src/signal/monitor/event-handler.ts +++ b/src/signal/monitor/event-handler.ts @@ -24,12 +24,8 @@ import { } from "../../auto-reply/reply/history.js"; import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; -import { - readSessionUpdatedAt, - recordSessionMetaFromInbound, - resolveStorePath, - updateLastRoute, -} from "../../config/sessions.js"; +import { recordInboundSession } from "../../channels/session.js"; +import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import { mediaKindFromMime } from "../../media/constants.js"; @@ -159,27 +155,23 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { OriginatingTo: signalTo, }); - void recordSessionMetaFromInbound({ + await recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, - }).catch((err) => { - logVerbose(`signal: failed updating session meta: ${String(err)}`); + updateLastRoute: !entry.isGroup + ? { + sessionKey: route.mainSessionKey, + channel: "signal", + to: entry.senderRecipient, + accountId: route.accountId, + } + : undefined, + onRecordError: (err) => { + logVerbose(`signal: failed updating session meta: ${String(err)}`); + }, }); - if (!entry.isGroup) { - await updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - deliveryContext: { - channel: "signal", - to: entry.senderRecipient, - accountId: route.accountId, - }, - ctx: ctxPayload, - }); - } - if (shouldLogVerbose()) { const preview = body.slice(0, 200).replace(/\\n/g, "\\\\n"); logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`); diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index e06795f59..80beee623 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -11,7 +11,6 @@ import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js"; import { clearHistoryEntriesIfEnabled } from "../../../auto-reply/reply/history.js"; import { removeAckReactionAfterReply } from "../../../channels/ack-reactions.js"; import { createReplyDispatcherWithTyping } from "../../../auto-reply/reply/reply-dispatcher.js"; -import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js"; import { removeSlackReaction } from "../../actions.js"; import { resolveSlackThreadTargets } from "../../threading.js"; @@ -25,23 +24,6 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const cfg = ctx.cfg; const runtime = ctx.runtime; - if (prepared.isDirectMessage) { - const sessionCfg = cfg.session; - const storePath = resolveStorePath(sessionCfg?.store, { - agentId: route.agentId, - }); - await updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - deliveryContext: { - channel: "slack", - to: `user:${message.user}`, - accountId: route.accountId, - }, - ctx: prepared.ctxPayload, - }); - } - const { statusThreadTs } = resolveSlackThreadTargets({ message, replyToMode: ctx.replyToMode, diff --git a/src/slack/monitor/message-handler/prepare.ts b/src/slack/monitor/message-handler/prepare.ts index 06f77f620..8014a3ef7 100644 --- a/src/slack/monitor/message-handler/prepare.ts +++ b/src/slack/monitor/message-handler/prepare.ts @@ -27,11 +27,8 @@ import { resolveMentionGatingWithBypass } from "../../../channels/mention-gating import { resolveConversationLabel } from "../../../channels/conversation-label.js"; import { resolveControlCommandGate } from "../../../channels/command-gating.js"; import { formatAllowlistMatchMeta } from "../../../channels/allowlist-match.js"; -import { - readSessionUpdatedAt, - recordSessionMetaFromInbound, - resolveStorePath, -} from "../../../config/sessions.js"; +import { recordInboundSession } from "../../../channels/session.js"; +import { readSessionUpdatedAt, resolveStorePath } from "../../../config/sessions.js"; import type { ResolvedSlackAccount } from "../../accounts.js"; import { reactSlackMessage } from "../../actions.js"; @@ -511,19 +508,28 @@ export async function prepareSlackMessage(params: { OriginatingTo: slackTo, }) satisfies FinalizedMsgContext; - void recordSessionMetaFromInbound({ + await recordInboundSession({ storePath, - sessionKey: sessionKey, + sessionKey, ctx: ctxPayload, - }).catch((err) => { - ctx.logger.warn( - { - error: String(err), - storePath, - sessionKey, - }, - "failed updating session meta", - ); + updateLastRoute: isDirectMessage + ? { + sessionKey: route.mainSessionKey, + channel: "slack", + to: `user:${message.user}`, + accountId: route.accountId, + } + : undefined, + onRecordError: (err) => { + ctx.logger.warn( + { + error: String(err), + storePath, + sessionKey, + }, + "failed updating session meta", + ); + }, }); const replyTarget = ctxPayload.To ?? undefined; diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index e92222e1c..c9a2d52ce 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -12,13 +12,9 @@ import { import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js"; import { buildMentionRegexes, matchesMentionPatterns } from "../auto-reply/reply/mentions.js"; import { formatLocationText, toLocationContext } from "../channels/location.js"; +import { recordInboundSession } from "../channels/session.js"; import { formatCliCommand } from "../cli/command-format.js"; -import { - readSessionUpdatedAt, - recordSessionMetaFromInbound, - resolveStorePath, - updateLastRoute, -} from "../config/sessions.js"; +import { readSessionUpdatedAt, resolveStorePath } from "../config/sessions.js"; import type { ClawdbotConfig } from "../config/config.js"; import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js"; import { logVerbose, shouldLogVerbose } from "../globals.js"; @@ -519,12 +515,21 @@ export const buildTelegramMessageContext = async ({ OriginatingTo: `telegram:${chatId}`, }); - void recordSessionMetaFromInbound({ + await recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, - }).catch((err) => { - logVerbose(`telegram: failed updating session meta: ${String(err)}`); + updateLastRoute: !isGroup + ? { + sessionKey: route.mainSessionKey, + channel: "telegram", + to: String(chatId), + accountId: route.accountId, + } + : undefined, + onRecordError: (err) => { + logVerbose(`telegram: failed updating session meta: ${String(err)}`); + }, }); if (replyTarget && shouldLogVerbose()) { @@ -540,19 +545,6 @@ export const buildTelegramMessageContext = async ({ ); } - if (!isGroup) { - await updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - deliveryContext: { - channel: "telegram", - to: String(chatId), - accountId: route.accountId, - }, - ctx: ctxPayload, - }); - } - if (shouldLogVerbose()) { const preview = body.slice(0, 200).replace(/\n/g, "\\n"); const mediaInfo = allMedia.length > 1 ? ` mediaCount=${allMedia.length}` : "";