refactor: centralize inbound session updates

This commit is contained in:
Peter Steinberger
2026-01-23 22:48:03 +00:00
parent 521ea4ae5b
commit d82ecaf9dc
16 changed files with 170 additions and 162 deletions

View File

@@ -129,6 +129,7 @@ function createMockRuntime(): PluginRuntime {
session: {
resolveStorePath: mockResolveStorePath as unknown as PluginRuntime["channel"]["session"]["resolveStorePath"],
readSessionUpdatedAt: mockReadSessionUpdatedAt as unknown as PluginRuntime["channel"]["session"]["readSessionUpdatedAt"],
recordInboundSession: vi.fn() as unknown as PluginRuntime["channel"]["session"]["recordInboundSession"],
recordSessionMetaFromInbound: vi.fn() as unknown as PluginRuntime["channel"]["session"]["recordSessionMetaFromInbound"],
updateLastRoute: vi.fn() as unknown as PluginRuntime["channel"]["session"]["updateLastRoute"],
},

View File

@@ -487,29 +487,25 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
OriginatingTo: `room:${roomId}`,
});
void core.channel.session
.recordSessionMetaFromInbound({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
})
.catch((err) => {
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute: isDirectMessage
? {
sessionKey: route.mainSessionKey,
channel: "matrix",
to: `room:${roomId}`,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logger.warn(
{ error: String(err), storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey },
"failed updating session meta",
);
});
if (isDirectMessage) {
await core.channel.session.updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,
channel: "matrix",
to: `room:${roomId}`,
accountId: route.accountId,
ctx: ctxPayload,
});
}
},
});
const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n");
logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`);

View File

@@ -465,12 +465,13 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
...mediaPayload,
});
void core.channel.session.recordSessionMetaFromInbound({
storePath,
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerboseMessage(`msteams: failed updating session meta: ${String(err)}`);
onRecordError: (err) => {
logVerboseMessage(`msteams: failed updating session meta: ${String(err)}`);
},
});
logVerboseMessage(`msteams inbound: from=${ctxPayload.From} preview="${preview}"`);

View File

@@ -287,15 +287,14 @@ export async function handleNextcloudTalkInbound(params: {
CommandAuthorized: commandAuthorized,
});
void core.channel.session
.recordSessionMetaFromInbound({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
})
.catch((err) => {
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
onRecordError: (err) => {
runtime.error?.(`nextcloud-talk: failed updating session meta: ${String(err)}`);
});
},
});
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,

View File

@@ -570,12 +570,13 @@ async function processMessageWithPipeline(params: {
OriginatingTo: `zalo:${chatId}`,
});
void core.channel.session.recordSessionMetaFromInbound({
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
runtime.error?.(`zalo: failed updating session meta: ${String(err)}`);
onRecordError: (err) => {
runtime.error?.(`zalo: failed updating session meta: ${String(err)}`);
},
});
const tableMode = core.channel.text.resolveMarkdownTableMode({

View File

@@ -311,12 +311,13 @@ async function processMessage(
OriginatingTo: `zalouser:${chatId}`,
});
void core.channel.session.recordSessionMetaFromInbound({
await core.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`);
onRecordError: (err) => {
runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`);
},
});
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({

49
src/channels/session.ts Normal file
View File

@@ -0,0 +1,49 @@
import type { MsgContext } from "../auto-reply/templating.js";
import {
recordSessionMetaFromInbound,
type GroupKeyResolution,
type SessionEntry,
updateLastRoute,
} from "../config/sessions.js";
export type InboundLastRouteUpdate = {
sessionKey: string;
channel: SessionEntry["lastChannel"];
to: string;
accountId?: string;
threadId?: string | number;
};
export async function recordInboundSession(params: {
storePath: string;
sessionKey: string;
ctx: MsgContext;
groupResolution?: GroupKeyResolution | null;
createIfMissing?: boolean;
updateLastRoute?: InboundLastRouteUpdate;
onRecordError: (err: unknown) => void;
}): Promise<void> {
const { storePath, sessionKey, ctx, groupResolution, createIfMissing } = params;
void recordSessionMetaFromInbound({
storePath,
sessionKey,
ctx,
groupResolution,
createIfMissing,
}).catch(params.onRecordError);
const update = params.updateLastRoute;
if (!update) return;
await updateLastRoute({
storePath,
sessionKey: update.sessionKey,
deliveryContext: {
channel: update.channel,
to: update.to,
accountId: update.accountId,
threadId: update.threadId,
},
ctx,
groupResolution,
});
}

View File

@@ -25,12 +25,8 @@ import {
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import {
readSessionUpdatedAt,
recordSessionMetaFromInbound,
resolveStorePath,
updateLastRoute,
} from "../../config/sessions.js";
import { recordInboundSession } from "../../channels/session.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { buildAgentSessionKey } from "../../routing/resolve-route.js";
@@ -293,27 +289,23 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget,
});
void recordSessionMetaFromInbound({
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerbose(`discord: failed updating session meta: ${String(err)}`);
updateLastRoute: isDirectMessage
? {
sessionKey: route.mainSessionKey,
channel: "discord",
to: `user:${author.id}`,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`discord: failed updating session meta: ${String(err)}`);
},
});
if (isDirectMessage) {
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,
deliveryContext: {
channel: "discord",
to: `user:${author.id}`,
accountId: route.accountId,
},
ctx: ctxPayload,
});
}
if (shouldLogVerbose()) {
const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n");
logVerbose(

View File

@@ -31,17 +31,13 @@ import {
} from "../../auto-reply/reply/history.js";
import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js";
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
import { recordInboundSession } from "../../channels/session.js";
import { loadConfig } from "../../config/config.js";
import {
resolveChannelGroupPolicy,
resolveChannelGroupRequireMention,
} from "../../config/group-policy.js";
import {
readSessionUpdatedAt,
recordSessionMetaFromInbound,
resolveStorePath,
updateLastRoute,
} from "../../config/sessions.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { waitForTransportReady } from "../../infra/transport-ready.js";
import { mediaKindFromMime } from "../../media/constants.js";
@@ -509,30 +505,25 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
OriginatingTo: imessageTo,
});
void recordSessionMetaFromInbound({
const updateTarget = (isGroup ? chatTarget : undefined) || sender;
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerbose(`imessage: failed updating session meta: ${String(err)}`);
updateLastRoute:
!isGroup && updateTarget
? {
sessionKey: route.mainSessionKey,
channel: "imessage",
to: updateTarget,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`imessage: failed updating session meta: ${String(err)}`);
},
});
if (!isGroup) {
const to = (isGroup ? chatTarget : undefined) || sender;
if (to) {
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,
deliveryContext: {
channel: "imessage",
to,
accountId: route.accountId,
},
ctx: ctxPayload,
});
}
}
if (shouldLogVerbose()) {
const preview = truncateUtf16Safe(body, 200).replace(/\n/g, "\\n");
logVerbose(

View File

@@ -140,6 +140,7 @@ export {
resolveTelegramGroupRequireMention,
resolveWhatsAppGroupRequireMention,
} from "../channels/plugins/group-mentions.js";
export { recordInboundSession } from "../channels/session.js";
export {
buildChannelKeyCandidates,
normalizeChannelSlug,

View File

@@ -27,6 +27,7 @@ import { handleSlackAction } from "../../agents/tools/slack-actions.js";
import { handleWhatsAppAction } from "../../agents/tools/whatsapp-actions.js";
import { removeAckReactionAfterReply, shouldAckReaction } from "../../channels/ack-reactions.js";
import { resolveCommandAuthorizedFromAuthorizers } from "../../channels/command-gating.js";
import { recordInboundSession } from "../../channels/session.js";
import { discordMessageActions } from "../../channels/plugins/actions/discord.js";
import { telegramMessageActions } from "../../channels/plugins/actions/telegram.js";
import { createWhatsAppLoginTool } from "../../channels/plugins/agent-tools/whatsapp-login.js";
@@ -193,6 +194,7 @@ export function createPluginRuntime(): PluginRuntime {
resolveStorePath,
readSessionUpdatedAt,
recordSessionMetaFromInbound,
recordInboundSession,
updateLastRoute,
},
mentions: {

View File

@@ -54,6 +54,7 @@ type FormatInboundEnvelope = typeof import("../../auto-reply/envelope.js").forma
type ResolveEnvelopeFormatOptions =
typeof import("../../auto-reply/envelope.js").resolveEnvelopeFormatOptions;
type ResolveStateDir = typeof import("../../config/paths.js").resolveStateDir;
type RecordInboundSession = typeof import("../../channels/session.js").recordInboundSession;
type RecordSessionMetaFromInbound =
typeof import("../../config/sessions.js").recordSessionMetaFromInbound;
type ResolveStorePath = typeof import("../../config/sessions.js").resolveStorePath;
@@ -208,6 +209,7 @@ export type PluginRuntime = {
resolveStorePath: ResolveStorePath;
readSessionUpdatedAt: ReadSessionUpdatedAt;
recordSessionMetaFromInbound: RecordSessionMetaFromInbound;
recordInboundSession: RecordInboundSession;
updateLastRoute: UpdateLastRoute;
};
mentions: {

View File

@@ -24,12 +24,8 @@ import {
} from "../../auto-reply/reply/history.js";
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import {
readSessionUpdatedAt,
recordSessionMetaFromInbound,
resolveStorePath,
updateLastRoute,
} from "../../config/sessions.js";
import { recordInboundSession } from "../../channels/session.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { mediaKindFromMime } from "../../media/constants.js";
@@ -159,27 +155,23 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
OriginatingTo: signalTo,
});
void recordSessionMetaFromInbound({
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerbose(`signal: failed updating session meta: ${String(err)}`);
updateLastRoute: !entry.isGroup
? {
sessionKey: route.mainSessionKey,
channel: "signal",
to: entry.senderRecipient,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`signal: failed updating session meta: ${String(err)}`);
},
});
if (!entry.isGroup) {
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,
deliveryContext: {
channel: "signal",
to: entry.senderRecipient,
accountId: route.accountId,
},
ctx: ctxPayload,
});
}
if (shouldLogVerbose()) {
const preview = body.slice(0, 200).replace(/\\n/g, "\\\\n");
logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`);

View File

@@ -11,7 +11,6 @@ import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js";
import { clearHistoryEntriesIfEnabled } from "../../../auto-reply/reply/history.js";
import { removeAckReactionAfterReply } from "../../../channels/ack-reactions.js";
import { createReplyDispatcherWithTyping } from "../../../auto-reply/reply/reply-dispatcher.js";
import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js";
import { removeSlackReaction } from "../../actions.js";
import { resolveSlackThreadTargets } from "../../threading.js";
@@ -25,23 +24,6 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
const cfg = ctx.cfg;
const runtime = ctx.runtime;
if (prepared.isDirectMessage) {
const sessionCfg = cfg.session;
const storePath = resolveStorePath(sessionCfg?.store, {
agentId: route.agentId,
});
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,
deliveryContext: {
channel: "slack",
to: `user:${message.user}`,
accountId: route.accountId,
},
ctx: prepared.ctxPayload,
});
}
const { statusThreadTs } = resolveSlackThreadTargets({
message,
replyToMode: ctx.replyToMode,

View File

@@ -27,11 +27,8 @@ import { resolveMentionGatingWithBypass } from "../../../channels/mention-gating
import { resolveConversationLabel } from "../../../channels/conversation-label.js";
import { resolveControlCommandGate } from "../../../channels/command-gating.js";
import { formatAllowlistMatchMeta } from "../../../channels/allowlist-match.js";
import {
readSessionUpdatedAt,
recordSessionMetaFromInbound,
resolveStorePath,
} from "../../../config/sessions.js";
import { recordInboundSession } from "../../../channels/session.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../../config/sessions.js";
import type { ResolvedSlackAccount } from "../../accounts.js";
import { reactSlackMessage } from "../../actions.js";
@@ -511,19 +508,28 @@ export async function prepareSlackMessage(params: {
OriginatingTo: slackTo,
}) satisfies FinalizedMsgContext;
void recordSessionMetaFromInbound({
await recordInboundSession({
storePath,
sessionKey: sessionKey,
sessionKey,
ctx: ctxPayload,
}).catch((err) => {
ctx.logger.warn(
{
error: String(err),
storePath,
sessionKey,
},
"failed updating session meta",
);
updateLastRoute: isDirectMessage
? {
sessionKey: route.mainSessionKey,
channel: "slack",
to: `user:${message.user}`,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
ctx.logger.warn(
{
error: String(err),
storePath,
sessionKey,
},
"failed updating session meta",
);
},
});
const replyTarget = ctxPayload.To ?? undefined;

View File

@@ -12,13 +12,9 @@ import {
import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
import { buildMentionRegexes, matchesMentionPatterns } from "../auto-reply/reply/mentions.js";
import { formatLocationText, toLocationContext } from "../channels/location.js";
import { recordInboundSession } from "../channels/session.js";
import { formatCliCommand } from "../cli/command-format.js";
import {
readSessionUpdatedAt,
recordSessionMetaFromInbound,
resolveStorePath,
updateLastRoute,
} from "../config/sessions.js";
import { readSessionUpdatedAt, resolveStorePath } from "../config/sessions.js";
import type { ClawdbotConfig } from "../config/config.js";
import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js";
import { logVerbose, shouldLogVerbose } from "../globals.js";
@@ -519,12 +515,21 @@ export const buildTelegramMessageContext = async ({
OriginatingTo: `telegram:${chatId}`,
});
void recordSessionMetaFromInbound({
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerbose(`telegram: failed updating session meta: ${String(err)}`);
updateLastRoute: !isGroup
? {
sessionKey: route.mainSessionKey,
channel: "telegram",
to: String(chatId),
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`telegram: failed updating session meta: ${String(err)}`);
},
});
if (replyTarget && shouldLogVerbose()) {
@@ -540,19 +545,6 @@ export const buildTelegramMessageContext = async ({
);
}
if (!isGroup) {
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,
deliveryContext: {
channel: "telegram",
to: String(chatId),
accountId: route.accountId,
},
ctx: ctxPayload,
});
}
if (shouldLogVerbose()) {
const preview = body.slice(0, 200).replace(/\n/g, "\\n");
const mediaInfo = allMedia.length > 1 ? ` mediaCount=${allMedia.length}` : "";