diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ca280aff..e48279137 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Docs: https://docs.clawd.bot ### Changes - Memory: add OpenAI Batch API indexing for embeddings when configured. - Memory: enable OpenAI batch indexing by default for OpenAI embeddings. +- Sessions: persist origin metadata across connectors for generic session explainers. ### Fixes - Memory: retry transient 5xx errors (Cloudflare) during embedding indexing. diff --git a/docs/concepts/session.md b/docs/concepts/session.md index 0fe0b2414..bd8c1f9a4 100644 --- a/docs/concepts/session.md +++ b/docs/concepts/session.md @@ -25,6 +25,7 @@ All session state is **owned by the gateway** (the “master” Clawdbot). UI cl - Transcripts: `~/.clawdbot/agents//sessions/.jsonl` (Telegram topic sessions use `.../-topic-.jsonl`). - The store is a map `sessionKey -> { sessionId, updatedAt, ... }`. Deleting entries is safe; they are recreated on demand. - Group entries may include `displayName`, `channel`, `subject`, `room`, and `space` to label sessions in UIs. +- Session entries include `origin` metadata (label + routing hints) so UIs can explain where a session came from. - Clawdbot does **not** read legacy Pi/Tau session folders. ## Session pruning @@ -113,3 +114,11 @@ Send these as standalone messages so they register. ## Tips - Keep the primary key dedicated to 1:1 traffic; let groups keep their own keys. - When automating cleanup, delete individual keys instead of the whole store to preserve context elsewhere. + +## Session origin metadata +Each session entry records where it came from (best-effort) in `origin`: +- `label`: human label (resolved from conversation label + group subject/channel) +- `provider`: normalized channel id (including extensions) +- `from`/`to`: raw routing ids from the inbound envelope +- `accountId`: provider account id (when multi-account) +- `threadId`: thread/topic id when the channel supports it diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 60880d64a..35034d78d 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -18,7 +18,11 @@ import type { ReplyPayload } from "../../../../../src/auto-reply/types.js"; import { resolveCommandAuthorizedFromAuthorizers } from "../../../../../src/channels/command-gating.js"; import { formatAllowlistMatchMeta } from "../../../../../src/channels/plugins/allowlist-match.js"; import { loadConfig } from "../../../../../src/config/config.js"; -import { resolveStorePath, updateLastRoute } from "../../../../../src/config/sessions.js"; +import { + recordSessionMetaFromInbound, + resolveStorePath, + updateLastRoute, +} from "../../../../../src/config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../../../../src/globals.js"; import { enqueueSystemEvent } from "../../../../../src/infra/system-events.js"; import { getChildLogger } from "../../../../../src/logging.js"; @@ -494,7 +498,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi }); const groupSystemPrompt = roomConfigInfo.config?.systemPrompt?.trim() || undefined; - const ctxPayload = finalizeInboundContext({ + const ctxPayload = finalizeInboundContext({ Body: body, RawBody: bodyText, CommandBody: bodyText, @@ -526,10 +530,21 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi OriginatingTo: `room:${roomId}`, }); + const storePath = resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + void recordSessionMetaFromInbound({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + }).catch((err) => { + logger.warn( + { error: String(err), storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey }, + "failed updating session meta", + ); + }); + if (isDirectMessage) { - const storePath = resolveStorePath(cfg.session?.store, { - agentId: route.agentId, - }); await updateLastRoute({ storePath, sessionKey: route.mainSessionKey, diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index 0e5b6dc13..b4c1e6fc7 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -18,6 +18,7 @@ import { resolveCommandAuthorizedFromAuthorizers } from "../../../../src/channel import { formatAllowlistMatchMeta } from "../../../../src/channels/plugins/allowlist-match.js"; import { danger, logVerbose, shouldLogVerbose } from "../../../../src/globals.js"; import { enqueueSystemEvent } from "../../../../src/infra/system-events.js"; +import { recordSessionMetaFromInbound, resolveStorePath } from "../../../../src/config/sessions.js"; import { readChannelAllowFromStore, upsertChannelPairingRequest, @@ -459,6 +460,17 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { ...mediaPayload, }); + const storePath = resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + void recordSessionMetaFromInbound({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + }).catch((err) => { + logVerbose(`msteams: failed updating session meta: ${String(err)}`); + }); + if (shouldLogVerbose()) { logVerbose(`msteams inbound: from=${ctxPayload.From} preview="${preview}"`); } diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index 3f9a064a9..bd7a0e179 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -7,6 +7,7 @@ import { } from "../../../src/auto-reply/command-detection.js"; import { finalizeInboundContext } from "../../../src/auto-reply/reply/inbound-context.js"; import { resolveCommandAuthorizedFromAuthorizers } from "../../../src/channels/command-gating.js"; +import { recordSessionMetaFromInbound, resolveStorePath } from "../../../src/config/sessions.js"; import { ZaloApiError, deleteWebhook, @@ -552,6 +553,17 @@ async function processMessageWithPipeline(params: { OriginatingTo: `zalo:${chatId}`, }); + const storePath = resolveStorePath(config.session?.store, { + agentId: route.agentId, + }); + void recordSessionMetaFromInbound({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + }).catch((err) => { + runtime.error?.(`zalo: failed updating session meta: ${String(err)}`); + }); + await deps.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: config, diff --git a/extensions/zalouser/src/monitor.ts b/extensions/zalouser/src/monitor.ts index 1e95e0b36..5ec687c36 100644 --- a/extensions/zalouser/src/monitor.ts +++ b/extensions/zalouser/src/monitor.ts @@ -8,6 +8,7 @@ import { import { mergeAllowlist, summarizeMapping } from "../../../src/channels/allowlists/resolve-utils.js"; import { finalizeInboundContext } from "../../../src/auto-reply/reply/inbound-context.js"; import { resolveCommandAuthorizedFromAuthorizers } from "../../../src/channels/command-gating.js"; +import { recordSessionMetaFromInbound, resolveStorePath } from "../../../src/config/sessions.js"; import { loadCoreChannelDeps, type CoreChannelDeps } from "./core-bridge.js"; import { sendMessageZalouser } from "./send.js"; import type { @@ -299,6 +300,17 @@ async function processMessage( OriginatingTo: `zalouser:${chatId}`, }); + const storePath = resolveStorePath(config.session?.store, { + agentId: route.agentId, + }); + void recordSessionMetaFromInbound({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + }).catch((err) => { + runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`); + }); + await deps.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: config, diff --git a/src/agents/subagent-announce.format.test.ts b/src/agents/subagent-announce.format.test.ts index 0b3679c49..c2b11984b 100644 --- a/src/agents/subagent-announce.format.test.ts +++ b/src/agents/subagent-announce.format.test.ts @@ -39,6 +39,7 @@ vi.mock("../config/sessions.js", () => ({ resolveAgentIdFromSessionKey: () => "main", resolveStorePath: () => "/tmp/sessions.json", resolveMainSessionKey: () => "agent:main:main", + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); vi.mock("./pi-embedded.js", () => embeddedRunMock); diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index 3dc75c283..7215c8979 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -4,13 +4,11 @@ import path from "node:path"; import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; import { resolveSessionAgentId } from "../../agents/agent-scope.js"; -import { getChannelDock } from "../../channels/dock.js"; -import { normalizeChannelId } from "../../channels/plugins/index.js"; import type { ClawdbotConfig } from "../../config/config.js"; import { - buildGroupDisplayName, DEFAULT_IDLE_MINUTES, DEFAULT_RESET_TRIGGERS, + deriveSessionMetaPatch, type GroupKeyResolution, loadSessionStore, resolveGroupSessionKey, @@ -237,39 +235,16 @@ export async function initSessionState(params: { lastTo, lastAccountId, }; - if (groupResolution?.channel) { - const channel = groupResolution.channel; - const subject = ctx.GroupSubject?.trim(); - const space = ctx.GroupSpace?.trim(); - const explicitChannel = ctx.GroupChannel?.trim(); - const normalizedChannel = normalizeChannelId(channel); - const isChannelProvider = Boolean( - normalizedChannel && - getChannelDock(normalizedChannel)?.capabilities.chatTypes.includes("channel"), - ); - const nextGroupChannel = - explicitChannel ?? - ((groupResolution.chatType === "channel" || isChannelProvider) && - subject && - subject.startsWith("#") - ? subject - : undefined); - const nextSubject = nextGroupChannel ? undefined : subject; - sessionEntry.chatType = groupResolution.chatType ?? "group"; - sessionEntry.channel = channel; - sessionEntry.groupId = groupResolution.id; - if (nextSubject) sessionEntry.subject = nextSubject; - if (nextGroupChannel) sessionEntry.groupChannel = nextGroupChannel; - if (space) sessionEntry.space = space; - sessionEntry.displayName = buildGroupDisplayName({ - provider: sessionEntry.channel, - subject: sessionEntry.subject, - groupChannel: sessionEntry.groupChannel, - space: sessionEntry.space, - id: groupResolution.id, - key: sessionKey, - }); - } else if (!sessionEntry.chatType) { + const metaPatch = deriveSessionMetaPatch({ + ctx: sessionCtxForState, + sessionKey, + existing: sessionEntry, + groupResolution, + }); + if (metaPatch) { + sessionEntry = { ...sessionEntry, ...metaPatch }; + } + if (!sessionEntry.chatType) { sessionEntry.chatType = "direct"; } const threadLabel = ctx.ThreadLabel?.trim(); diff --git a/src/commands/health.snapshot.test.ts b/src/commands/health.snapshot.test.ts index 436e3a275..d747b3eeb 100644 --- a/src/commands/health.snapshot.test.ts +++ b/src/commands/health.snapshot.test.ts @@ -21,6 +21,7 @@ vi.mock("../config/config.js", async (importOriginal) => { vi.mock("../config/sessions.js", () => ({ resolveStorePath: () => "/tmp/sessions.json", loadSessionStore: () => testStore, + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); vi.mock("../web/auth-store.js", () => ({ diff --git a/src/commands/status.test.ts b/src/commands/status.test.ts index d3784e83b..97622ed1f 100644 --- a/src/commands/status.test.ts +++ b/src/commands/status.test.ts @@ -100,6 +100,7 @@ vi.mock("../config/sessions.js", () => ({ loadSessionStore: mocks.loadSessionStore, resolveMainSessionKey: mocks.resolveMainSessionKey, resolveStorePath: mocks.resolveStorePath, + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); vi.mock("../channels/plugins/index.js", () => ({ listChannelPlugins: () => diff --git a/src/config/sessions.ts b/src/config/sessions.ts index 4113fc206..3fa3014d5 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -1,4 +1,5 @@ export * from "./sessions/group.js"; +export * from "./sessions/metadata.js"; export * from "./sessions/main-session.js"; export * from "./sessions/paths.js"; export * from "./sessions/session-key.js"; diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 2d2793ffb..c65f50f6e 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -11,6 +11,8 @@ import { normalizeSessionDeliveryFields, type DeliveryContext, } from "../../utils/delivery-context.js"; +import type { MsgContext } from "../../auto-reply/templating.js"; +import { deriveSessionMetaPatch } from "./metadata.js"; import { mergeSessionEntry, type SessionEntry } from "./types.js"; // ============================================================================ @@ -334,6 +336,31 @@ export async function updateSessionStoreEntry(params: { }); } +export async function recordSessionMetaFromInbound(params: { + storePath: string; + sessionKey: string; + ctx: MsgContext; + groupResolution?: import("./types.js").GroupKeyResolution | null; + createIfMissing?: boolean; +}): Promise { + const { storePath, sessionKey, ctx } = params; + const createIfMissing = params.createIfMissing ?? true; + return await updateSessionStore(storePath, (store) => { + const existing = store[sessionKey]; + const patch = deriveSessionMetaPatch({ + ctx, + sessionKey, + existing, + groupResolution: params.groupResolution, + }); + if (!patch) return existing ?? null; + if (!existing && !createIfMissing) return null; + const next = mergeSessionEntry(existing, patch); + store[sessionKey] = next; + return next; + }); +} + export async function updateLastRoute(params: { storePath: string; sessionKey: string; diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 075d5b905..486184292 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -11,6 +11,17 @@ export type SessionChannelId = ChannelId | "webchat"; export type SessionChatType = NormalizedChatType; +export type SessionOrigin = { + label?: string; + provider?: string; + surface?: string; + chatType?: SessionChatType; + from?: string; + to?: string; + accountId?: string; + threadId?: string | number; +}; + export type SessionEntry = { /** * Last delivered heartbeat payload (used to suppress duplicate heartbeat notifications). @@ -69,6 +80,7 @@ export type SessionEntry = { subject?: string; groupChannel?: string; space?: string; + origin?: SessionOrigin; deliveryContext?: DeliveryContext; lastChannel?: SessionChannelId; lastTo?: string; diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 5f74522a6..1fd298859 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -17,7 +17,11 @@ import { import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; -import { resolveStorePath, updateLastRoute } from "../../config/sessions.js"; +import { + recordSessionMetaFromInbound, + resolveStorePath, + updateLastRoute, +} from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { buildAgentSessionKey } from "../../routing/resolve-route.js"; import { resolveThreadSessionKeys } from "../../routing/session-key.js"; @@ -264,11 +268,18 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget, }); + const storePath = resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + void recordSessionMetaFromInbound({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + }).catch((err) => { + logVerbose(`discord: failed updating session meta: ${String(err)}`); + }); + if (isDirectMessage) { - const sessionCfg = cfg.session; - const storePath = resolveStorePath(sessionCfg?.store, { - agentId: route.agentId, - }); await updateLastRoute({ storePath, sessionKey: route.mainSessionKey, diff --git a/src/gateway/server-bridge-methods-sessions.ts b/src/gateway/server-bridge-methods-sessions.ts index f106925ef..4522d5957 100644 --- a/src/gateway/server-bridge-methods-sessions.ts +++ b/src/gateway/server-bridge-methods-sessions.ts @@ -9,6 +9,7 @@ import { import { loadConfig } from "../config/config.js"; import { resolveMainSessionKeyFromConfig, + snapshotSessionOrigin, type SessionEntry, updateSessionStore, } from "../config/sessions.js"; @@ -205,6 +206,7 @@ export const handleSessionsBridgeMethods: BridgeMethodHandler = async ( contextTokens: entry?.contextTokens, sendPolicy: entry?.sendPolicy, label: entry?.label, + origin: snapshotSessionOrigin(entry), displayName: entry?.displayName, chatType: entry?.chatType, channel: entry?.channel, diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 94a3ca3f3..9d3752627 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -6,6 +6,7 @@ import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js"; import { clearSessionQueues } from "../../auto-reply/reply/queue.js"; import { loadConfig } from "../../config/config.js"; import { + snapshotSessionOrigin, resolveMainSessionKey, type SessionEntry, updateSessionStore, @@ -173,6 +174,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { contextTokens: entry?.contextTokens, sendPolicy: entry?.sendPolicy, label: entry?.label, + origin: snapshotSessionOrigin(entry), lastChannel: entry?.lastChannel, lastTo: entry?.lastTo, skillsSnapshot: entry?.skillsSnapshot, diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index ec0b426c0..b17018ae3 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -381,6 +381,8 @@ export function listSessionsFromStore(params: { const groupChannel = entry?.groupChannel; const space = entry?.space; const id = parsed?.id; + const origin = entry?.origin; + const originLabel = origin?.label; const displayName = entry?.displayName ?? (channel @@ -393,7 +395,8 @@ export function listSessionsFromStore(params: { key, }) : undefined) ?? - entry?.label; + entry?.label ?? + originLabel; const deliveryFields = normalizeSessionDeliveryFields(entry); return { key, @@ -405,6 +408,7 @@ export function listSessionsFromStore(params: { groupChannel, space, chatType: entry?.chatType, + origin, updatedAt, sessionId: entry?.sessionId, systemSent: entry?.systemSent, diff --git a/src/gateway/session-utils.types.ts b/src/gateway/session-utils.types.ts index 051e293ac..726787a36 100644 --- a/src/gateway/session-utils.types.ts +++ b/src/gateway/session-utils.types.ts @@ -18,6 +18,7 @@ export type GatewaySessionRow = { groupChannel?: string; space?: string; chatType?: NormalizedChatType; + origin?: SessionEntry["origin"]; updatedAt: number | null; sessionId?: string; systemSent?: boolean; diff --git a/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts b/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts index 13486e188..e8a72f836 100644 --- a/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts +++ b/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts @@ -38,6 +38,7 @@ vi.mock("../pairing/pairing-store.js", () => ({ vi.mock("../config/sessions.js", () => ({ resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); vi.mock("./client.js", () => ({ diff --git a/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts b/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts index 7fe151561..194437544 100644 --- a/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts +++ b/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts @@ -38,6 +38,7 @@ vi.mock("../pairing/pairing-store.js", () => ({ vi.mock("../config/sessions.js", () => ({ resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); vi.mock("./client.js", () => ({ diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index 4ee271eac..ff2c05ec1 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -32,7 +32,11 @@ import { resolveChannelGroupPolicy, resolveChannelGroupRequireMention, } from "../../config/group-policy.js"; -import { resolveStorePath, updateLastRoute } from "../../config/sessions.js"; +import { + recordSessionMetaFromInbound, + resolveStorePath, + updateLastRoute, +} from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { waitForTransportReady } from "../../infra/transport-ready.js"; import { mediaKindFromMime } from "../../media/constants.js"; @@ -449,11 +453,18 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P OriginatingTo: imessageTo, }); + const storePath = resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + void recordSessionMetaFromInbound({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + }).catch((err) => { + logVerbose(`imessage: failed updating session meta: ${String(err)}`); + }); + if (!isGroup) { - const sessionCfg = cfg.session; - const storePath = resolveStorePath(sessionCfg?.store, { - agentId: route.agentId, - }); const to = (isGroup ? chatTarget : undefined) || sender; if (to) { await updateLastRoute({ diff --git a/src/signal/monitor.tool-result.pairs-uuid-only-senders-uuid-allowlist-entry.test.ts b/src/signal/monitor.tool-result.pairs-uuid-only-senders-uuid-allowlist-entry.test.ts index f0e8eedab..65fc3eabc 100644 --- a/src/signal/monitor.tool-result.pairs-uuid-only-senders-uuid-allowlist-entry.test.ts +++ b/src/signal/monitor.tool-result.pairs-uuid-only-senders-uuid-allowlist-entry.test.ts @@ -35,6 +35,7 @@ vi.mock("../pairing/pairing-store.js", () => ({ vi.mock("../config/sessions.js", () => ({ resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); const streamMock = vi.fn(); diff --git a/src/signal/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts b/src/signal/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts index d16892646..859cc4ccd 100644 --- a/src/signal/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts +++ b/src/signal/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts @@ -39,6 +39,7 @@ vi.mock("../pairing/pairing-store.js", () => ({ vi.mock("../config/sessions.js", () => ({ resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); const streamMock = vi.fn(); diff --git a/src/signal/monitor/event-handler.ts b/src/signal/monitor/event-handler.ts index d17841504..752e3d3a6 100644 --- a/src/signal/monitor/event-handler.ts +++ b/src/signal/monitor/event-handler.ts @@ -20,7 +20,11 @@ import { } from "../../auto-reply/reply/history.js"; import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; -import { resolveStorePath, updateLastRoute } from "../../config/sessions.js"; +import { + recordSessionMetaFromInbound, + resolveStorePath, + updateLastRoute, +} from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import { mediaKindFromMime } from "../../media/constants.js"; @@ -140,11 +144,18 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { OriginatingTo: signalTo, }); + const storePath = resolveStorePath(deps.cfg.session?.store, { + agentId: route.agentId, + }); + void recordSessionMetaFromInbound({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + }).catch((err) => { + logVerbose(`signal: failed updating session meta: ${String(err)}`); + }); + if (!entry.isGroup) { - const sessionCfg = deps.cfg.session; - const storePath = resolveStorePath(sessionCfg?.store, { - agentId: route.agentId, - }); await updateLastRoute({ storePath, sessionKey: route.mainSessionKey, diff --git a/src/slack/monitor.tool-result.forces-thread-replies-replytoid-is-set.test.ts b/src/slack/monitor.tool-result.forces-thread-replies-replytoid-is-set.test.ts index 458ab188d..c2c224a76 100644 --- a/src/slack/monitor.tool-result.forces-thread-replies-replytoid-is-set.test.ts +++ b/src/slack/monitor.tool-result.forces-thread-replies-replytoid-is-set.test.ts @@ -54,6 +54,7 @@ vi.mock("../config/sessions.js", () => ({ resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), resolveSessionKey: vi.fn(), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); vi.mock("@slack/bolt", () => { diff --git a/src/slack/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts b/src/slack/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts index 3978c2408..90bd3df96 100644 --- a/src/slack/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts +++ b/src/slack/monitor.tool-result.sends-tool-summaries-responseprefix.test.ts @@ -56,6 +56,7 @@ vi.mock("../config/sessions.js", () => ({ resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), resolveSessionKey: vi.fn(), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); vi.mock("@slack/bolt", () => { diff --git a/src/slack/monitor.tool-result.threads-top-level-replies-replytomode-is-all.test.ts b/src/slack/monitor.tool-result.threads-top-level-replies-replytomode-is-all.test.ts index f20459ce4..80c5c273c 100644 --- a/src/slack/monitor.tool-result.threads-top-level-replies-replytomode-is-all.test.ts +++ b/src/slack/monitor.tool-result.threads-top-level-replies-replytomode-is-all.test.ts @@ -54,6 +54,7 @@ vi.mock("../config/sessions.js", () => ({ resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), resolveSessionKey: vi.fn(), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), })); vi.mock("@slack/bolt", () => { diff --git a/src/slack/monitor/message-handler/prepare.ts b/src/slack/monitor/message-handler/prepare.ts index 5f424f84c..a1fe35675 100644 --- a/src/slack/monitor/message-handler/prepare.ts +++ b/src/slack/monitor/message-handler/prepare.ts @@ -21,6 +21,7 @@ import { resolveThreadSessionKeys } from "../../../routing/session-key.js"; import { resolveMentionGatingWithBypass } from "../../../channels/mention-gating.js"; import { resolveConversationLabel } from "../../../channels/conversation-label.js"; import { resolveControlCommandGate } from "../../../channels/command-gating.js"; +import { recordSessionMetaFromInbound, resolveStorePath } from "../../../config/sessions.js"; import type { ResolvedSlackAccount } from "../../accounts.js"; import { reactSlackMessage } from "../../actions.js"; @@ -471,6 +472,24 @@ export async function prepareSlackMessage(params: { OriginatingTo: slackTo, }) satisfies FinalizedMsgContext; + const storePath = resolveStorePath(ctx.cfg.session?.store, { + agentId: route.agentId, + }); + void recordSessionMetaFromInbound({ + storePath, + sessionKey: sessionKey, + ctx: ctxPayload, + }).catch((err) => { + ctx.logger.warn( + { + error: String(err), + storePath, + sessionKey, + }, + "failed updating session meta", + ); + }); + const replyTarget = ctxPayload.To ?? undefined; if (!replyTarget) return null; diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index 286354c4f..364a32ec3 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -12,7 +12,11 @@ import { import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js"; import { buildMentionRegexes, matchesMentionPatterns } from "../auto-reply/reply/mentions.js"; import { formatLocationText, toLocationContext } from "../channels/location.js"; -import { resolveStorePath, updateLastRoute } from "../config/sessions.js"; +import { + recordSessionMetaFromInbound, + resolveStorePath, + updateLastRoute, +} from "../config/sessions.js"; import type { ClawdbotConfig } from "../config/config.js"; import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js"; import { logVerbose, shouldLogVerbose } from "../globals.js"; @@ -500,6 +504,17 @@ export const buildTelegramMessageContext = async ({ OriginatingTo: `telegram:${chatId}`, }); + const storePath = resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + void recordSessionMetaFromInbound({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + }).catch((err) => { + logVerbose(`telegram: failed updating session meta: ${String(err)}`); + }); + if (replyTarget && shouldLogVerbose()) { const preview = replyTarget.body.replace(/\s+/g, " ").slice(0, 120); logVerbose( @@ -514,10 +529,6 @@ export const buildTelegramMessageContext = async ({ } if (!isGroup) { - const sessionCfg = cfg.session; - const storePath = resolveStorePath(sessionCfg?.store, { - agentId: route.agentId, - }); await updateLastRoute({ storePath, sessionKey: route.mainSessionKey, diff --git a/src/web/auto-reply/monitor/process-message.ts b/src/web/auto-reply/monitor/process-message.ts index 170baeba5..3ad1b5bb0 100644 --- a/src/web/auto-reply/monitor/process-message.ts +++ b/src/web/auto-reply/monitor/process-message.ts @@ -20,6 +20,7 @@ import { shouldComputeCommandAuthorized } from "../../../auto-reply/command-dete import { finalizeInboundContext } from "../../../auto-reply/reply/inbound-context.js"; import { toLocationContext } from "../../../channels/location.js"; import type { loadConfig } from "../../../config/config.js"; +import { recordSessionMetaFromInbound, resolveStorePath } from "../../../config/sessions.js"; import { logVerbose, shouldLogVerbose } from "../../../globals.js"; import type { getChildLogger } from "../../../logging.js"; import { readChannelAllowFromStore } from "../../../pairing/pairing-store.js"; @@ -33,7 +34,7 @@ import type { WebInboundMsg } from "../types.js"; import { elide } from "../util.js"; import { maybeSendAckReaction } from "./ack-reaction.js"; import { formatGroupMembers } from "./group-members.js"; -import { updateLastRouteInBackground } from "./last-route.js"; +import { trackBackgroundTask, updateLastRouteInBackground } from "./last-route.js"; import { buildInboundLine } from "./message-line.js"; export type GroupHistoryEntry = { @@ -249,8 +250,7 @@ export async function processMessage(params: { identityName: resolveIdentityName(params.cfg, params.route.agentId), }; - const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ - ctx: finalizeInboundContext({ + const ctxPayload = finalizeInboundContext({ Body: combinedBody, RawBody: params.msg.body, CommandBody: params.msg.body, @@ -283,7 +283,29 @@ export async function processMessage(params: { Surface: "whatsapp", OriginatingChannel: "whatsapp", OriginatingTo: params.msg.from, - }), + }); + + const storePath = resolveStorePath(params.cfg.session?.store, { + agentId: params.route.agentId, + }); + const metaTask = recordSessionMetaFromInbound({ + storePath, + sessionKey: params.route.sessionKey, + ctx: ctxPayload, + }).catch((err) => { + params.replyLogger.warn( + { + error: formatError(err), + storePath, + sessionKey: params.route.sessionKey, + }, + "failed updating session meta", + ); + }); + trackBackgroundTask(params.backgroundTasks, metaTask); + + const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, cfg: params.cfg, replyResolver: params.replyResolver, dispatcherOptions: {