feat: persist session origin metadata across connectors

This commit is contained in:
Peter Steinberger
2026-01-18 02:41:06 +00:00
parent 0c93b9b7bb
commit 34590d2144
30 changed files with 246 additions and 66 deletions

View File

@@ -39,6 +39,7 @@ vi.mock("../config/sessions.js", () => ({
resolveAgentIdFromSessionKey: () => "main",
resolveStorePath: () => "/tmp/sessions.json",
resolveMainSessionKey: () => "agent:main:main",
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("./pi-embedded.js", () => embeddedRunMock);

View File

@@ -4,13 +4,11 @@ import path from "node:path";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import { getChannelDock } from "../../channels/dock.js";
import { normalizeChannelId } from "../../channels/plugins/index.js";
import type { ClawdbotConfig } from "../../config/config.js";
import {
buildGroupDisplayName,
DEFAULT_IDLE_MINUTES,
DEFAULT_RESET_TRIGGERS,
deriveSessionMetaPatch,
type GroupKeyResolution,
loadSessionStore,
resolveGroupSessionKey,
@@ -237,39 +235,16 @@ export async function initSessionState(params: {
lastTo,
lastAccountId,
};
if (groupResolution?.channel) {
const channel = groupResolution.channel;
const subject = ctx.GroupSubject?.trim();
const space = ctx.GroupSpace?.trim();
const explicitChannel = ctx.GroupChannel?.trim();
const normalizedChannel = normalizeChannelId(channel);
const isChannelProvider = Boolean(
normalizedChannel &&
getChannelDock(normalizedChannel)?.capabilities.chatTypes.includes("channel"),
);
const nextGroupChannel =
explicitChannel ??
((groupResolution.chatType === "channel" || isChannelProvider) &&
subject &&
subject.startsWith("#")
? subject
: undefined);
const nextSubject = nextGroupChannel ? undefined : subject;
sessionEntry.chatType = groupResolution.chatType ?? "group";
sessionEntry.channel = channel;
sessionEntry.groupId = groupResolution.id;
if (nextSubject) sessionEntry.subject = nextSubject;
if (nextGroupChannel) sessionEntry.groupChannel = nextGroupChannel;
if (space) sessionEntry.space = space;
sessionEntry.displayName = buildGroupDisplayName({
provider: sessionEntry.channel,
subject: sessionEntry.subject,
groupChannel: sessionEntry.groupChannel,
space: sessionEntry.space,
id: groupResolution.id,
key: sessionKey,
});
} else if (!sessionEntry.chatType) {
const metaPatch = deriveSessionMetaPatch({
ctx: sessionCtxForState,
sessionKey,
existing: sessionEntry,
groupResolution,
});
if (metaPatch) {
sessionEntry = { ...sessionEntry, ...metaPatch };
}
if (!sessionEntry.chatType) {
sessionEntry.chatType = "direct";
}
const threadLabel = ctx.ThreadLabel?.trim();

View File

@@ -21,6 +21,7 @@ vi.mock("../config/config.js", async (importOriginal) => {
vi.mock("../config/sessions.js", () => ({
resolveStorePath: () => "/tmp/sessions.json",
loadSessionStore: () => testStore,
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("../web/auth-store.js", () => ({

View File

@@ -100,6 +100,7 @@ vi.mock("../config/sessions.js", () => ({
loadSessionStore: mocks.loadSessionStore,
resolveMainSessionKey: mocks.resolveMainSessionKey,
resolveStorePath: mocks.resolveStorePath,
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("../channels/plugins/index.js", () => ({
listChannelPlugins: () =>

View File

@@ -1,4 +1,5 @@
export * from "./sessions/group.js";
export * from "./sessions/metadata.js";
export * from "./sessions/main-session.js";
export * from "./sessions/paths.js";
export * from "./sessions/session-key.js";

View File

@@ -11,6 +11,8 @@ import {
normalizeSessionDeliveryFields,
type DeliveryContext,
} from "../../utils/delivery-context.js";
import type { MsgContext } from "../../auto-reply/templating.js";
import { deriveSessionMetaPatch } from "./metadata.js";
import { mergeSessionEntry, type SessionEntry } from "./types.js";
// ============================================================================
@@ -334,6 +336,31 @@ export async function updateSessionStoreEntry(params: {
});
}
export async function recordSessionMetaFromInbound(params: {
storePath: string;
sessionKey: string;
ctx: MsgContext;
groupResolution?: import("./types.js").GroupKeyResolution | null;
createIfMissing?: boolean;
}): Promise<SessionEntry | null> {
const { storePath, sessionKey, ctx } = params;
const createIfMissing = params.createIfMissing ?? true;
return await updateSessionStore(storePath, (store) => {
const existing = store[sessionKey];
const patch = deriveSessionMetaPatch({
ctx,
sessionKey,
existing,
groupResolution: params.groupResolution,
});
if (!patch) return existing ?? null;
if (!existing && !createIfMissing) return null;
const next = mergeSessionEntry(existing, patch);
store[sessionKey] = next;
return next;
});
}
export async function updateLastRoute(params: {
storePath: string;
sessionKey: string;

View File

@@ -11,6 +11,17 @@ export type SessionChannelId = ChannelId | "webchat";
export type SessionChatType = NormalizedChatType;
export type SessionOrigin = {
label?: string;
provider?: string;
surface?: string;
chatType?: SessionChatType;
from?: string;
to?: string;
accountId?: string;
threadId?: string | number;
};
export type SessionEntry = {
/**
* Last delivered heartbeat payload (used to suppress duplicate heartbeat notifications).
@@ -69,6 +80,7 @@ export type SessionEntry = {
subject?: string;
groupChannel?: string;
space?: string;
origin?: SessionOrigin;
deliveryContext?: DeliveryContext;
lastChannel?: SessionChannelId;
lastTo?: string;

View File

@@ -17,7 +17,11 @@ import {
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { resolveStorePath, updateLastRoute } from "../../config/sessions.js";
import {
recordSessionMetaFromInbound,
resolveStorePath,
updateLastRoute,
} from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { buildAgentSessionKey } from "../../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../../routing/session-key.js";
@@ -264,11 +268,18 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget,
});
const storePath = resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
void recordSessionMetaFromInbound({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerbose(`discord: failed updating session meta: ${String(err)}`);
});
if (isDirectMessage) {
const sessionCfg = cfg.session;
const storePath = resolveStorePath(sessionCfg?.store, {
agentId: route.agentId,
});
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,

View File

@@ -9,6 +9,7 @@ import {
import { loadConfig } from "../config/config.js";
import {
resolveMainSessionKeyFromConfig,
snapshotSessionOrigin,
type SessionEntry,
updateSessionStore,
} from "../config/sessions.js";
@@ -205,6 +206,7 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async (
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
label: entry?.label,
origin: snapshotSessionOrigin(entry),
displayName: entry?.displayName,
chatType: entry?.chatType,
channel: entry?.channel,

View File

@@ -6,6 +6,7 @@ import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js";
import { clearSessionQueues } from "../../auto-reply/reply/queue.js";
import { loadConfig } from "../../config/config.js";
import {
snapshotSessionOrigin,
resolveMainSessionKey,
type SessionEntry,
updateSessionStore,
@@ -173,6 +174,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
label: entry?.label,
origin: snapshotSessionOrigin(entry),
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,

View File

@@ -381,6 +381,8 @@ export function listSessionsFromStore(params: {
const groupChannel = entry?.groupChannel;
const space = entry?.space;
const id = parsed?.id;
const origin = entry?.origin;
const originLabel = origin?.label;
const displayName =
entry?.displayName ??
(channel
@@ -393,7 +395,8 @@ export function listSessionsFromStore(params: {
key,
})
: undefined) ??
entry?.label;
entry?.label ??
originLabel;
const deliveryFields = normalizeSessionDeliveryFields(entry);
return {
key,
@@ -405,6 +408,7 @@ export function listSessionsFromStore(params: {
groupChannel,
space,
chatType: entry?.chatType,
origin,
updatedAt,
sessionId: entry?.sessionId,
systemSent: entry?.systemSent,

View File

@@ -18,6 +18,7 @@ export type GatewaySessionRow = {
groupChannel?: string;
space?: string;
chatType?: NormalizedChatType;
origin?: SessionEntry["origin"];
updatedAt: number | null;
sessionId?: string;
systemSent?: boolean;

View File

@@ -38,6 +38,7 @@ vi.mock("../pairing/pairing-store.js", () => ({
vi.mock("../config/sessions.js", () => ({
resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"),
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("./client.js", () => ({

View File

@@ -38,6 +38,7 @@ vi.mock("../pairing/pairing-store.js", () => ({
vi.mock("../config/sessions.js", () => ({
resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"),
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("./client.js", () => ({

View File

@@ -32,7 +32,11 @@ import {
resolveChannelGroupPolicy,
resolveChannelGroupRequireMention,
} from "../../config/group-policy.js";
import { resolveStorePath, updateLastRoute } from "../../config/sessions.js";
import {
recordSessionMetaFromInbound,
resolveStorePath,
updateLastRoute,
} from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { waitForTransportReady } from "../../infra/transport-ready.js";
import { mediaKindFromMime } from "../../media/constants.js";
@@ -449,11 +453,18 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
OriginatingTo: imessageTo,
});
const storePath = resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
void recordSessionMetaFromInbound({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerbose(`imessage: failed updating session meta: ${String(err)}`);
});
if (!isGroup) {
const sessionCfg = cfg.session;
const storePath = resolveStorePath(sessionCfg?.store, {
agentId: route.agentId,
});
const to = (isGroup ? chatTarget : undefined) || sender;
if (to) {
await updateLastRoute({

View File

@@ -35,6 +35,7 @@ vi.mock("../pairing/pairing-store.js", () => ({
vi.mock("../config/sessions.js", () => ({
resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"),
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
const streamMock = vi.fn();

View File

@@ -39,6 +39,7 @@ vi.mock("../pairing/pairing-store.js", () => ({
vi.mock("../config/sessions.js", () => ({
resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"),
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
const streamMock = vi.fn();

View File

@@ -20,7 +20,11 @@ import {
} from "../../auto-reply/reply/history.js";
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
import { resolveStorePath, updateLastRoute } from "../../config/sessions.js";
import {
recordSessionMetaFromInbound,
resolveStorePath,
updateLastRoute,
} from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { mediaKindFromMime } from "../../media/constants.js";
@@ -140,11 +144,18 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
OriginatingTo: signalTo,
});
const storePath = resolveStorePath(deps.cfg.session?.store, {
agentId: route.agentId,
});
void recordSessionMetaFromInbound({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerbose(`signal: failed updating session meta: ${String(err)}`);
});
if (!entry.isGroup) {
const sessionCfg = deps.cfg.session;
const storePath = resolveStorePath(sessionCfg?.store, {
agentId: route.agentId,
});
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,

View File

@@ -54,6 +54,7 @@ vi.mock("../config/sessions.js", () => ({
resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"),
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
resolveSessionKey: vi.fn(),
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("@slack/bolt", () => {

View File

@@ -56,6 +56,7 @@ vi.mock("../config/sessions.js", () => ({
resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"),
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
resolveSessionKey: vi.fn(),
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("@slack/bolt", () => {

View File

@@ -54,6 +54,7 @@ vi.mock("../config/sessions.js", () => ({
resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"),
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
resolveSessionKey: vi.fn(),
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("@slack/bolt", () => {

View File

@@ -21,6 +21,7 @@ import { resolveThreadSessionKeys } from "../../../routing/session-key.js";
import { resolveMentionGatingWithBypass } from "../../../channels/mention-gating.js";
import { resolveConversationLabel } from "../../../channels/conversation-label.js";
import { resolveControlCommandGate } from "../../../channels/command-gating.js";
import { recordSessionMetaFromInbound, resolveStorePath } from "../../../config/sessions.js";
import type { ResolvedSlackAccount } from "../../accounts.js";
import { reactSlackMessage } from "../../actions.js";
@@ -471,6 +472,24 @@ export async function prepareSlackMessage(params: {
OriginatingTo: slackTo,
}) satisfies FinalizedMsgContext;
const storePath = resolveStorePath(ctx.cfg.session?.store, {
agentId: route.agentId,
});
void recordSessionMetaFromInbound({
storePath,
sessionKey: sessionKey,
ctx: ctxPayload,
}).catch((err) => {
ctx.logger.warn(
{
error: String(err),
storePath,
sessionKey,
},
"failed updating session meta",
);
});
const replyTarget = ctxPayload.To ?? undefined;
if (!replyTarget) return null;

View File

@@ -12,7 +12,11 @@ import {
import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
import { buildMentionRegexes, matchesMentionPatterns } from "../auto-reply/reply/mentions.js";
import { formatLocationText, toLocationContext } from "../channels/location.js";
import { resolveStorePath, updateLastRoute } from "../config/sessions.js";
import {
recordSessionMetaFromInbound,
resolveStorePath,
updateLastRoute,
} from "../config/sessions.js";
import type { ClawdbotConfig } from "../config/config.js";
import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js";
import { logVerbose, shouldLogVerbose } from "../globals.js";
@@ -500,6 +504,17 @@ export const buildTelegramMessageContext = async ({
OriginatingTo: `telegram:${chatId}`,
});
const storePath = resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
void recordSessionMetaFromInbound({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
logVerbose(`telegram: failed updating session meta: ${String(err)}`);
});
if (replyTarget && shouldLogVerbose()) {
const preview = replyTarget.body.replace(/\s+/g, " ").slice(0, 120);
logVerbose(
@@ -514,10 +529,6 @@ export const buildTelegramMessageContext = async ({
}
if (!isGroup) {
const sessionCfg = cfg.session;
const storePath = resolveStorePath(sessionCfg?.store, {
agentId: route.agentId,
});
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,

View File

@@ -20,6 +20,7 @@ import { shouldComputeCommandAuthorized } from "../../../auto-reply/command-dete
import { finalizeInboundContext } from "../../../auto-reply/reply/inbound-context.js";
import { toLocationContext } from "../../../channels/location.js";
import type { loadConfig } from "../../../config/config.js";
import { recordSessionMetaFromInbound, resolveStorePath } from "../../../config/sessions.js";
import { logVerbose, shouldLogVerbose } from "../../../globals.js";
import type { getChildLogger } from "../../../logging.js";
import { readChannelAllowFromStore } from "../../../pairing/pairing-store.js";
@@ -33,7 +34,7 @@ import type { WebInboundMsg } from "../types.js";
import { elide } from "../util.js";
import { maybeSendAckReaction } from "./ack-reaction.js";
import { formatGroupMembers } from "./group-members.js";
import { updateLastRouteInBackground } from "./last-route.js";
import { trackBackgroundTask, updateLastRouteInBackground } from "./last-route.js";
import { buildInboundLine } from "./message-line.js";
export type GroupHistoryEntry = {
@@ -249,8 +250,7 @@ export async function processMessage(params: {
identityName: resolveIdentityName(params.cfg, params.route.agentId),
};
const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
ctx: finalizeInboundContext({
const ctxPayload = finalizeInboundContext({
Body: combinedBody,
RawBody: params.msg.body,
CommandBody: params.msg.body,
@@ -283,7 +283,29 @@ export async function processMessage(params: {
Surface: "whatsapp",
OriginatingChannel: "whatsapp",
OriginatingTo: params.msg.from,
}),
});
const storePath = resolveStorePath(params.cfg.session?.store, {
agentId: params.route.agentId,
});
const metaTask = recordSessionMetaFromInbound({
storePath,
sessionKey: params.route.sessionKey,
ctx: ctxPayload,
}).catch((err) => {
params.replyLogger.warn(
{
error: formatError(err),
storePath,
sessionKey: params.route.sessionKey,
},
"failed updating session meta",
);
});
trackBackgroundTask(params.backgroundTasks, metaTask);
const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: params.cfg,
replyResolver: params.replyResolver,
dispatcherOptions: {