From 7e5cef29a0349af740ce0d2a392e0dbccb45a87d Mon Sep 17 00:00:00 2001 From: Shadow Date: Wed, 7 Jan 2026 09:02:20 -0600 Subject: [PATCH] Threads: add Slack/Discord thread sessions --- src/auto-reply/reply.ts | 17 ++-- src/auto-reply/reply/commands.ts | 4 +- src/auto-reply/reply/session.test.ts | 82 ++++++++++++++++ src/auto-reply/reply/session.ts | 66 +++++++++++++ src/auto-reply/status.ts | 9 +- src/auto-reply/templating.ts | 3 + src/commands/agent.ts | 4 +- src/config/sessions.ts | 13 +++ src/discord/monitor.tool-result.test.ts | 103 ++++++++++++++++++++ src/discord/monitor.ts | 122 ++++++++++++++++++++++-- src/gateway/server-bridge.ts | 4 +- src/gateway/server-methods/chat.ts | 4 +- src/gateway/server-methods/sessions.ts | 2 + src/gateway/server.chat.test.ts | 61 ++++++++++++ src/gateway/session-utils.ts | 9 +- src/slack/monitor.tool-result.test.ts | 109 +++++++++++++++++++++ src/slack/monitor.ts | 85 ++++++++++++++++- 17 files changed, 670 insertions(+), 27 deletions(-) create mode 100644 src/auto-reply/reply/session.test.ts diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index f4cc9b445..aaa9efd76 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -25,7 +25,7 @@ import { type ClawdbotConfig, loadConfig, } from "../config/config.js"; -import { resolveSessionTranscriptPath } from "../config/sessions.js"; +import { resolveSessionFilePath } from "../config/sessions.js"; import { logVerbose } from "../globals.js"; import { clearCommandLane, getQueueSize } from "../process/command-queue.js"; import { defaultRuntime } from "../runtime.js"; @@ -646,6 +646,11 @@ export async function getReplyFromConfig( isNewSession, prefixedBodyBase, }); + const threadStarterBody = ctx.ThreadStarterBody?.trim(); + const threadStarterNote = + isNewSession && threadStarterBody + ? `[Thread starter - for context]\n${threadStarterBody}` + : undefined; const skillResult = await ensureSkillSnapshot({ sessionEntry, sessionStore, @@ -661,10 +666,10 @@ export async function getReplyFromConfig( systemSent = skillResult.systemSent; const skillsSnapshot = skillResult.skillsSnapshot; const prefixedBody = transcribedText - ? [prefixedBodyBase, `Transcript:\n${transcribedText}`] + ? [threadStarterNote, prefixedBodyBase, `Transcript:\n${transcribedText}`] .filter(Boolean) .join("\n\n") - : prefixedBodyBase; + : [threadStarterNote, prefixedBodyBase].filter(Boolean).join("\n\n"); const mediaNote = ctx.MediaPath?.length ? `[media attached: ${ctx.MediaPath}${ctx.MediaType ? ` (${ctx.MediaType})` : ""}${ctx.MediaUrl ? ` | ${ctx.MediaUrl}` : ""}]` : undefined; @@ -689,12 +694,12 @@ export async function getReplyFromConfig( resolvedThinkLevel = await modelState.resolveDefaultThinkingLevel(); } const sessionIdFinal = sessionId ?? crypto.randomUUID(); - const sessionFile = resolveSessionTranscriptPath(sessionIdFinal); + const sessionFile = resolveSessionFilePath(sessionIdFinal, sessionEntry); const queueBodyBase = transcribedText - ? [baseBodyFinal, `Transcript:\n${transcribedText}`] + ? [threadStarterNote, baseBodyFinal, `Transcript:\n${transcribedText}`] .filter(Boolean) .join("\n\n") - : baseBodyFinal; + : [threadStarterNote, baseBodyFinal].filter(Boolean).join("\n\n"); const queuedBody = mediaNote ? [mediaNote, mediaReplyHint, queueBodyBase] .filter(Boolean) diff --git a/src/auto-reply/reply/commands.ts b/src/auto-reply/reply/commands.ts index fce665e9f..30152dda9 100644 --- a/src/auto-reply/reply/commands.ts +++ b/src/auto-reply/reply/commands.ts @@ -14,7 +14,7 @@ import { } from "../../agents/pi-embedded.js"; import type { ClawdbotConfig } from "../../config/config.js"; import { - resolveSessionTranscriptPath, + resolveSessionFilePath, type SessionEntry, type SessionScope, saveSessionStore, @@ -509,7 +509,7 @@ export async function handleCommands(params: { sessionId, sessionKey, messageProvider: command.provider, - sessionFile: resolveSessionTranscriptPath(sessionId), + sessionFile: resolveSessionFilePath(sessionId, sessionEntry), workspaceDir, config: cfg, skillsSnapshot: sessionEntry.skillsSnapshot, diff --git a/src/auto-reply/reply/session.test.ts b/src/auto-reply/reply/session.test.ts new file mode 100644 index 000000000..4e1e28ffb --- /dev/null +++ b/src/auto-reply/reply/session.test.ts @@ -0,0 +1,82 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import type { ClawdbotConfig } from "../../config/config.js"; +import { saveSessionStore } from "../../config/sessions.js"; +import { initSessionState } from "./session.js"; + +describe("initSessionState thread forking", () => { + it("forks a new session from the parent session file", async () => { + const root = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdbot-thread-session-"), + ); + const sessionsDir = path.join(root, "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + + const parentSessionId = "parent-session"; + const parentSessionFile = path.join(sessionsDir, "parent.jsonl"); + const header = { + type: "session", + version: 3, + id: parentSessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }; + const message = { + type: "message", + id: "m1", + parentId: null, + timestamp: new Date().toISOString(), + message: { role: "user", content: "Parent prompt" }, + }; + await fs.writeFile( + parentSessionFile, + `${JSON.stringify(header)}\n${JSON.stringify(message)}\n`, + "utf-8", + ); + + const storePath = path.join(root, "sessions.json"); + const parentSessionKey = "slack:channel:C1"; + await saveSessionStore(storePath, { + [parentSessionKey]: { + sessionId: parentSessionId, + sessionFile: parentSessionFile, + updatedAt: Date.now(), + }, + }); + + const cfg = { + session: { store: storePath }, + } as ClawdbotConfig; + + const threadSessionKey = "slack:thread:C1:123"; + const threadLabel = "Slack thread #general: starter"; + const result = await initSessionState({ + ctx: { + Body: "Thread reply", + SessionKey: threadSessionKey, + ParentSessionKey: parentSessionKey, + ThreadLabel: threadLabel, + }, + cfg, + commandAuthorized: true, + }); + + expect(result.sessionKey).toBe(threadSessionKey); + expect(result.sessionEntry.sessionId).not.toBe(parentSessionId); + expect(result.sessionEntry.sessionFile).toBeTruthy(); + expect(result.sessionEntry.displayName).toBe(threadLabel); + + const newSessionFile = result.sessionEntry.sessionFile!; + const [headerLine] = (await fs.readFile(newSessionFile, "utf-8")) + .split(/\r?\n/) + .filter((line) => line.trim().length > 0); + const parsedHeader = JSON.parse(headerLine) as { + parentSession?: string; + }; + expect(parsedHeader.parentSession).toBe(parentSessionFile); + }); +}); diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index 992fb2f61..872e798f0 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -1,5 +1,11 @@ import crypto from "node:crypto"; +import fs from "node:fs"; +import path from "node:path"; +import { + CURRENT_SESSION_VERSION, + SessionManager, +} from "@mariozechner/pi-coding-agent"; import type { ClawdbotConfig } from "../../config/config.js"; import { buildGroupDisplayName, @@ -9,6 +15,7 @@ import { loadSessionStore, resolveAgentIdFromSessionKey, resolveGroupSessionKey, + resolveSessionFilePath, resolveSessionKey, resolveStorePath, type SessionEntry, @@ -36,6 +43,45 @@ export type SessionInitResult = { triggerBodyNormalized: string; }; +function forkSessionFromParent(params: { + parentEntry: SessionEntry; +}): { sessionId: string; sessionFile: string } | null { + const parentSessionFile = resolveSessionFilePath( + params.parentEntry.sessionId, + params.parentEntry, + ); + if (!parentSessionFile || !fs.existsSync(parentSessionFile)) return null; + try { + const manager = SessionManager.open(parentSessionFile); + const leafId = manager.getLeafId(); + if (leafId) { + const sessionFile = + manager.createBranchedSession(leafId) ?? manager.getSessionFile(); + const sessionId = manager.getSessionId(); + if (sessionFile && sessionId) return { sessionId, sessionFile }; + } + const sessionId = crypto.randomUUID(); + const timestamp = new Date().toISOString(); + const fileTimestamp = timestamp.replace(/[:.]/g, "-"); + const sessionFile = path.join( + manager.getSessionDir(), + `${fileTimestamp}_${sessionId}.jsonl`, + ); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: sessionId, + timestamp, + cwd: manager.getCwd(), + parentSession: parentSessionFile, + }; + fs.writeFileSync(sessionFile, `${JSON.stringify(header)}\n`, "utf-8"); + return { sessionId, sessionFile }; + } catch { + return null; + } +} + export async function initSessionState(params: { ctx: MsgContext; cfg: ClawdbotConfig; @@ -189,6 +235,26 @@ export async function initSessionState(params: { } else if (!sessionEntry.chatType) { sessionEntry.chatType = "direct"; } + const threadLabel = ctx.ThreadLabel?.trim(); + if (threadLabel) { + sessionEntry.displayName = threadLabel; + } + const parentSessionKey = ctx.ParentSessionKey?.trim(); + if ( + isNewSession && + parentSessionKey && + parentSessionKey !== sessionKey && + sessionStore[parentSessionKey] + ) { + const forked = forkSessionFromParent({ + parentEntry: sessionStore[parentSessionKey], + }); + if (forked) { + sessionId = forked.sessionId; + sessionEntry.sessionId = forked.sessionId; + sessionEntry.sessionFile = forked.sessionFile; + } + } sessionStore[sessionKey] = sessionEntry; await saveSessionStore(storePath, sessionStore); diff --git a/src/auto-reply/status.ts b/src/auto-reply/status.ts index da125ece8..1c177c89e 100644 --- a/src/auto-reply/status.ts +++ b/src/auto-reply/status.ts @@ -16,7 +16,7 @@ import { import type { ClawdbotConfig } from "../config/config.js"; import { resolveMainSessionKey, - resolveSessionTranscriptPath, + resolveSessionFilePath, type SessionEntry, type SessionScope, } from "../config/sessions.js"; @@ -185,6 +185,7 @@ const formatQueueDetails = (queue?: QueueStatus) => { const readUsageFromSessionLog = ( sessionId?: string, + sessionEntry?: SessionEntry, ): | { input: number; @@ -194,9 +195,9 @@ const readUsageFromSessionLog = ( model?: string; } | undefined => { - // Transcripts always live at: ~/.clawdbot/sessions/.jsonl + // Transcripts are stored at the session file path (fallback: ~/.clawdbot/sessions/.jsonl) if (!sessionId) return undefined; - const logPath = resolveSessionTranscriptPath(sessionId); + const logPath = resolveSessionFilePath(sessionId, sessionEntry); if (!fs.existsSync(logPath)) return undefined; try { @@ -264,7 +265,7 @@ export function buildStatusMessage(args: StatusArgs): string { // Prefer prompt-size tokens from the session transcript when it looks larger // (cached prompt tokens are often missing from agent meta/store). if (args.includeTranscriptUsage) { - const logUsage = readUsageFromSessionLog(entry?.sessionId); + const logUsage = readUsageFromSessionLog(entry?.sessionId, entry); if (logUsage) { const candidate = logUsage.promptTokens || logUsage.total; if (!totalTokens || totalTokens === 0 || candidate > totalTokens) { diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index a63243237..398290c2f 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -15,10 +15,13 @@ export type MsgContext = { SessionKey?: string; /** Provider account id (multi-account). */ AccountId?: string; + ParentSessionKey?: string; MessageSid?: string; ReplyToId?: string; ReplyToBody?: string; ReplyToSender?: string; + ThreadStarterBody?: string; + ThreadLabel?: string; MediaPath?: string; MediaUrl?: string; MediaType?: string; diff --git a/src/commands/agent.ts b/src/commands/agent.ts index c425e09e5..2172c8475 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -36,7 +36,7 @@ import { loadSessionStore, resolveAgentIdFromSessionKey, resolveSessionKey, - resolveSessionTranscriptPath, + resolveSessionFilePath, resolveStorePath, type SessionEntry, saveSessionStore, @@ -386,7 +386,7 @@ export async function agentCommand( catalog: catalogForThinking, }); } - const sessionFile = resolveSessionTranscriptPath(sessionId); + const sessionFile = resolveSessionFilePath(sessionId, sessionEntry); const startedAt = Date.now(); let lifecycleEnded = false; diff --git a/src/config/sessions.ts b/src/config/sessions.ts index e1f986a1d..024761b73 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -33,6 +33,7 @@ export type SessionChatType = "direct" | "group" | "room"; export type SessionEntry = { sessionId: string; updatedAt: number; + sessionFile?: string; /** Parent session key that spawned this session (used for sandbox session-tool scoping). */ spawnedBy?: string; systemSent?: boolean; @@ -137,6 +138,17 @@ export function resolveSessionTranscriptPath( return path.join(resolveAgentSessionsDir(agentId), `${sessionId}.jsonl`); } +export function resolveSessionFilePath( + sessionId: string, + entry?: SessionEntry, + opts?: { agentId?: string }, +): string { + const candidate = entry?.sessionFile?.trim(); + return candidate + ? candidate + : resolveSessionTranscriptPath(sessionId, opts?.agentId); +} + export function resolveStorePath(store?: string, opts?: { agentId?: string }) { const agentId = normalizeAgentId(opts?.agentId ?? DEFAULT_AGENT_ID); if (!store) return resolveDefaultSessionStorePath(agentId); @@ -393,6 +405,7 @@ export async function updateLastRoute(params: { const next: SessionEntry = { sessionId: existing?.sessionId ?? crypto.randomUUID(), updatedAt: Math.max(existing?.updatedAt ?? 0, now), + sessionFile: existing?.sessionFile, systemSent: existing?.systemSent, abortedLastRun: existing?.abortedLastRun, thinkingLevel: existing?.thinkingLevel, diff --git a/src/discord/monitor.tool-result.test.ts b/src/discord/monitor.tool-result.test.ts index 310c07e82..2abf02624 100644 --- a/src/discord/monitor.tool-result.test.ts +++ b/src/discord/monitor.tool-result.test.ts @@ -167,4 +167,107 @@ describe("discord tool result dispatch", () => { expect(dispatchMock).toHaveBeenCalledTimes(1); expect(sendMock).toHaveBeenCalledTimes(1); }, 10000); + + }); + + it("forks thread sessions and injects starter context", async () => { + const { createDiscordMessageHandler } = await import("./monitor.js"); + const { resolveSessionKey } = await import("../config/sessions.js"); + vi.mocked(resolveSessionKey).mockReturnValue("discord:parent:p1"); + + let capturedCtx: + | { + SessionKey?: string; + ParentSessionKey?: string; + ThreadStarterBody?: string; + ThreadLabel?: string; + } + | undefined; + dispatchMock.mockImplementationOnce(async ({ ctx, dispatcher }) => { + capturedCtx = ctx; + dispatcher.sendFinalReply({ text: "hi" }); + return { queuedFinal: true, counts: { final: 1 } }; + }); + + const cfg = { + agent: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/clawd" }, + session: { store: "/tmp/clawdbot-sessions.json" }, + messages: { responsePrefix: "PFX" }, + discord: { + dm: { enabled: true, policy: "open" }, + guilds: { "*": { requireMention: false } }, + }, + routing: { allowFrom: [] }, + } as ReturnType; + + const handler = createDiscordMessageHandler({ + cfg, + token: "token", + runtime: { + log: vi.fn(), + error: vi.fn(), + exit: (code: number): never => { + throw new Error(`exit ${code}`); + }, + }, + botUserId: "bot-id", + guildHistories: new Map(), + historyLimit: 0, + mediaMaxBytes: 10_000, + textLimit: 2000, + replyToMode: "off", + dmEnabled: true, + groupDmEnabled: false, + guildEntries: { "*": { requireMention: false } }, + }); + + const threadChannel = { + type: ChannelType.GuildText, + name: "thread-name", + parentId: "p1", + parent: { id: "p1", name: "general" }, + isThread: () => true, + fetchStarterMessage: async () => ({ + content: "starter message", + author: { tag: "Alice#1", username: "Alice" }, + createdTimestamp: Date.now(), + }), + }; + + const client = { + fetchChannel: vi.fn().mockResolvedValue({ + type: ChannelType.GuildText, + name: "thread-name", + }), + } as unknown as Client; + + await handler( + { + message: { + id: "m4", + content: "thread reply", + channelId: "t1", + channel: threadChannel, + timestamp: new Date().toISOString(), + type: MessageType.Default, + attachments: [], + embeds: [], + mentionedEveryone: false, + mentionedUsers: [], + mentionedRoles: [], + author: { id: "u2", bot: false, username: "Bob", tag: "Bob#2" }, + }, + author: { id: "u2", bot: false, username: "Bob", tag: "Bob#2" }, + member: { displayName: "Bob" }, + guild: { id: "g1", name: "Guild" }, + guild_id: "g1", + }, + client, + ); + + expect(capturedCtx?.SessionKey).toBe("discord:thread:t1"); + expect(capturedCtx?.ParentSessionKey).toBe("discord:parent:p1"); + expect(capturedCtx?.ThreadStarterBody).toContain("starter message"); + expect(capturedCtx?.ThreadLabel).toContain("Discord thread #general"); + }); }); diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 473d18eab..95d4a9df5 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -11,6 +11,11 @@ import { MessageReactionRemoveListener, MessageType, type RequestClient, + type PartialMessage, + type PartialMessageReaction, + Partials, + type ThreadChannel, + type PartialUser, type User, } from "@buape/carbon"; import { GatewayIntents, GatewayPlugin } from "@buape/carbon/gateway"; @@ -81,6 +86,44 @@ type DiscordHistoryEntry = { }; type DiscordReactionEvent = Parameters[0]; +type DiscordThreadStarter = { + text: string; + author: string; + timestamp?: number; +}; + +const DISCORD_THREAD_STARTER_CACHE = new Map(); + +async function resolveDiscordThreadStarter( + channel: ThreadChannel, +): Promise { + const cacheKey = channel.id; + const cached = DISCORD_THREAD_STARTER_CACHE.get(cacheKey); + if (cached) return cached; + try { + const starter = await channel.fetchStarterMessage(); + if (!starter) return null; + const text = + starter.content?.trim() ?? + starter.embeds?.[0]?.description?.trim() ?? + ""; + if (!text) return null; + const author = + starter.member?.displayName ?? + starter.author?.tag ?? + starter.author?.username ?? + "Unknown"; + const payload: DiscordThreadStarter = { + text, + author, + timestamp: starter.createdTimestamp ?? undefined, + }; + DISCORD_THREAD_STARTER_CACHE.set(cacheKey, payload); + return payload; + } catch { + return null; + } +} export type DiscordAllowList = { allowAll: boolean; @@ -509,7 +552,30 @@ export function createDiscordMessageHandler(params: { return; } - const channelName = channelInfo?.name; + const channelName = + channelInfo?.name ?? + ((isGuildMessage || isGroupDm) && "name" in message.channel + ? message.channel.name + : undefined); + const isThreadChannel = + isGuildMessage && + "isThread" in message.channel && + message.channel.isThread(); + const threadChannel = isThreadChannel + ? (message.channel as ThreadChannel) + : null; + const threadParentId = + threadChannel?.parentId ?? threadChannel?.parent?.id ?? undefined; + const threadParentName = threadChannel?.parent?.name; + const threadName = threadChannel?.name; + const configChannelName = threadParentName ?? channelName; + const configChannelSlug = configChannelName + ? normalizeDiscordSlug(configChannelName) + : ""; + const displayChannelName = threadName ?? channelName; + const displayChannelSlug = displayChannelName + ? normalizeDiscordSlug(displayChannelName) + : ""; const channelSlug = channelName ? normalizeDiscordSlug(channelName) : ""; const guildSlug = guildInfo?.slug || @@ -527,9 +593,9 @@ export function createDiscordMessageHandler(params: { const channelConfig = isGuildMessage ? resolveDiscordChannelConfig({ guildInfo, - channelId: message.channelId, - channelName, - channelSlug, + channelId: threadParentId ?? message.channelId, + channelName: configChannelName, + channelSlug: configChannelSlug, }) : null; if (isGuildMessage && channelConfig?.enabled === false) { @@ -544,8 +610,8 @@ export function createDiscordMessageHandler(params: { resolveGroupDmAllow({ channels: groupDmChannels, channelId: message.channelId, - channelName, - channelSlug, + channelName: displayChannelName, + channelSlug: displayChannelSlug, }); if (isGroupDm && !groupDmAllowed) return; @@ -715,7 +781,9 @@ export function createDiscordMessageHandler(params: { channelId: message.channelId, }); const groupRoom = - isGuildMessage && channelSlug ? `#${channelSlug}` : undefined; + isGuildMessage && displayChannelSlug + ? `#${displayChannelSlug}` + : undefined; const groupSubject = isDirectMessage ? undefined : groupRoom; const channelDescription = channelInfo?.topic?.trim(); const systemPromptParts = [ @@ -761,6 +829,41 @@ export function createDiscordMessageHandler(params: { combinedBody = `[Replied message - for context]\n${replyContext}\n\n${combinedBody}`; } + let threadStarterBody: string | undefined; + let threadLabel: string | undefined; + let threadSessionKey: string | undefined; + let parentSessionKey: string | undefined; + if (threadChannel) { + const starter = await resolveDiscordThreadStarter(threadChannel); + if (starter?.text) { + const starterEnvelope = formatAgentEnvelope({ + surface: "Discord", + from: starter.author, + timestamp: starter.timestamp, + body: starter.text, + }); + threadStarterBody = starterEnvelope; + } + const parentName = threadParentName ?? "parent"; + threadLabel = threadName + ? `Discord thread #${normalizeDiscordSlug(parentName)} › ${threadName}` + : `Discord thread #${normalizeDiscordSlug(parentName)}`; + threadSessionKey = `discord:thread:${message.channelId}`; + const sessionCfg = cfg.session; + const sessionScope = sessionCfg?.scope ?? "per-sender"; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + if (threadParentId) { + parentSessionKey = resolveSessionKey( + sessionScope, + { + From: `group:${threadParentId}`, + ChatType: "group", + Surface: "discord", + }, + mainKey, + ); + } + } const mediaPayload = buildDiscordMediaPayload(mediaList); const discordTo = `channel:${message.channelId}`; const ctxPayload = { @@ -769,7 +872,7 @@ export function createDiscordMessageHandler(params: { ? `discord:${author.id}` : `group:${message.channelId}`, To: discordTo, - SessionKey: route.sessionKey, + SessionKey: threadSessionKey ?? route.sessionKey, AccountId: route.accountId, ChatType: isDirectMessage ? "direct" : "group", SenderName: @@ -787,6 +890,9 @@ export function createDiscordMessageHandler(params: { Surface: "discord" as const, WasMentioned: wasMentioned, MessageSid: message.id, + ParentSessionKey: parentSessionKey, + ThreadStarterBody: threadStarterBody, + ThreadLabel: threadLabel, Timestamp: resolveTimestampMs(message.timestamp), ...mediaPayload, CommandAuthorized: commandAuthorized, diff --git a/src/gateway/server-bridge.ts b/src/gateway/server-bridge.ts index 2fc6f49af..5d4b2cf5e 100644 --- a/src/gateway/server-bridge.ts +++ b/src/gateway/server-bridge.ts @@ -707,6 +707,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { for (const candidate of resolveSessionTranscriptCandidates( sessionId, storePath, + entry?.sessionFile, )) { if (!fs.existsSync(candidate)) continue; try { @@ -773,6 +774,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { const filePath = resolveSessionTranscriptCandidates( sessionId, storePath, + entry?.sessionFile, ).find((candidate) => fs.existsSync(candidate)); if (!filePath) { return { @@ -843,7 +845,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { const sessionId = entry?.sessionId; const rawMessages = sessionId && storePath - ? readSessionMessages(sessionId, storePath) + ? readSessionMessages(sessionId, storePath, entry?.sessionFile) : []; const max = typeof limit === "number" ? limit : 200; const sliced = diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 36b412442..6b2799200 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -46,7 +46,9 @@ export const chatHandlers: GatewayRequestHandlers = { const { cfg, storePath, entry } = loadSessionEntry(sessionKey); const sessionId = entry?.sessionId; const rawMessages = - sessionId && storePath ? readSessionMessages(sessionId, storePath) : []; + sessionId && storePath + ? readSessionMessages(sessionId, storePath, entry?.sessionFile) + : []; const hardMax = 1000; const defaultLimit = 200; const requested = typeof limit === "number" ? limit : defaultLimit; diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 7da991553..3e86dfdb1 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -485,6 +485,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { for (const candidate of resolveSessionTranscriptCandidates( sessionId, storePath, + entry?.sessionFile, target.agentId, )) { if (!fs.existsSync(candidate)) continue; @@ -559,6 +560,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { const filePath = resolveSessionTranscriptCandidates( sessionId, storePath, + entry?.sessionFile, target.agentId, ).find((candidate) => fs.existsSync(candidate)); if (!filePath) { diff --git a/src/gateway/server.chat.test.ts b/src/gateway/server.chat.test.ts index 2c4a72af2..b4650cc51 100644 --- a/src/gateway/server.chat.test.ts +++ b/src/gateway/server.chat.test.ts @@ -327,6 +327,67 @@ describe("gateway server chat", () => { await server.close(); }); + test("chat.history prefers sessionFile when set", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-")); + testState.sessionStorePath = path.join(dir, "sessions.json"); + + const forkedPath = path.join(dir, "sess-forked.jsonl"); + await fs.writeFile( + forkedPath, + JSON.stringify({ + message: { + role: "user", + content: [{ type: "text", text: "from-fork" }], + timestamp: Date.now(), + }, + }), + "utf-8", + ); + + await fs.writeFile( + path.join(dir, "sess-main.jsonl"), + JSON.stringify({ + message: { + role: "user", + content: [{ type: "text", text: "from-default" }], + timestamp: Date.now(), + }, + }), + "utf-8", + ); + + await fs.writeFile( + testState.sessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + sessionFile: forkedPath, + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + "utf-8", + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const res = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", { + sessionKey: "main", + }); + expect(res.ok).toBe(true); + const messages = res.payload?.messages ?? []; + expect(messages.length).toBe(1); + const first = messages[0] as { content?: { text?: string }[] }; + expect(first.content?.[0]?.text).toBe("from-fork"); + + ws.close(); + await server.close(); + }); + test("chat.history defaults thinking to low for reasoning-capable models", async () => { piSdkMock.enabled = true; piSdkMock.models = [ diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 3aa1aca4d..91dc540e8 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -74,8 +74,13 @@ export type SessionsPatchResult = { export function readSessionMessages( sessionId: string, storePath: string | undefined, + sessionFile?: string, ): unknown[] { - const candidates = resolveSessionTranscriptCandidates(sessionId, storePath); + const candidates = resolveSessionTranscriptCandidates( + sessionId, + storePath, + sessionFile, + ); const filePath = candidates.find((p) => fs.existsSync(p)); if (!filePath) return []; @@ -99,9 +104,11 @@ export function readSessionMessages( export function resolveSessionTranscriptCandidates( sessionId: string, storePath: string | undefined, + sessionFile?: string, agentId?: string, ): string[] { const candidates: string[] = []; + if (sessionFile) candidates.push(sessionFile); if (storePath) { const dir = path.dirname(storePath); candidates.push(path.join(dir, `${sessionId}.jsonl`)); diff --git a/src/slack/monitor.tool-result.test.ts b/src/slack/monitor.tool-result.test.ts index 55ab49cf1..64caad341 100644 --- a/src/slack/monitor.tool-result.test.ts +++ b/src/slack/monitor.tool-result.test.ts @@ -57,6 +57,7 @@ vi.mock("@slack/bolt", () => { info: vi.fn().mockResolvedValue({ channel: { name: "dm", is_im: true }, }), + replies: vi.fn().mockResolvedValue({ messages: [] }), }, users: { info: vi.fn().mockResolvedValue({ @@ -283,6 +284,114 @@ describe("monitorSlackProvider tool results", () => { expect(sendMock.mock.calls[0][2]).toMatchObject({ threadTs: "456" }); }); + it("treats parent_user_id as a thread reply even when thread_ts matches ts", async () => { + const { resolveSessionKey } = await import("../config/sessions.js"); + vi.mocked(resolveSessionKey).mockReturnValue("main"); + replyMock.mockResolvedValue({ text: "thread reply" }); + + const controller = new AbortController(); + const run = monitorSlackProvider({ + botToken: "bot-token", + appToken: "app-token", + abortSignal: controller.signal, + }); + + await waitForEvent("message"); + const handler = getSlackHandlers()?.get("message"); + if (!handler) throw new Error("Slack message handler not registered"); + + await handler({ + event: { + type: "message", + user: "U1", + text: "hello", + ts: "123", + thread_ts: "123", + parent_user_id: "U2", + channel: "C1", + channel_type: "im", + }, + }); + + await flush(); + controller.abort(); + await run; + + expect(replyMock).toHaveBeenCalledTimes(1); + const ctx = replyMock.mock.calls[0]?.[0] as { + SessionKey?: string; + ParentSessionKey?: string; + }; + expect(ctx.SessionKey).toBe("slack:thread:C1:123"); + expect(ctx.ParentSessionKey).toBe("main"); + }); + + it("forks thread sessions and injects starter context", async () => { + const { resolveSessionKey } = await import("../config/sessions.js"); + vi.mocked(resolveSessionKey).mockReturnValue("slack:channel:C1"); + replyMock.mockResolvedValue({ text: "ok" }); + + const client = getSlackClient(); + if (client?.conversations?.info) { + client.conversations.info.mockResolvedValue({ + channel: { name: "general", is_channel: true }, + }); + } + if (client?.conversations?.replies) { + client.conversations.replies.mockResolvedValue({ + messages: [{ text: "starter message", user: "U2", ts: "111.222" }], + }); + } + + config = { + messages: { responsePrefix: "PFX" }, + slack: { + dm: { enabled: true, policy: "open", allowFrom: ["*"] }, + channels: { C1: { allow: true, requireMention: false } }, + }, + routing: { allowFrom: [] }, + }; + + const controller = new AbortController(); + const run = monitorSlackProvider({ + botToken: "bot-token", + appToken: "app-token", + abortSignal: controller.signal, + }); + + await waitForEvent("message"); + const handler = getSlackHandlers()?.get("message"); + if (!handler) throw new Error("Slack message handler not registered"); + + await handler({ + event: { + type: "message", + user: "U1", + text: "thread reply", + ts: "123.456", + thread_ts: "111.222", + channel: "C1", + channel_type: "channel", + }, + }); + + await flush(); + controller.abort(); + await run; + + expect(replyMock).toHaveBeenCalledTimes(1); + const ctx = replyMock.mock.calls[0]?.[0] as { + SessionKey?: string; + ParentSessionKey?: string; + ThreadStarterBody?: string; + ThreadLabel?: string; + }; + expect(ctx.SessionKey).toBe("slack:thread:C1:111.222"); + expect(ctx.ParentSessionKey).toBe("slack:channel:C1"); + expect(ctx.ThreadStarterBody).toContain("starter message"); + expect(ctx.ThreadLabel).toContain("Slack thread #general"); + }); + it("keeps replies in channel root when message is not threaded", async () => { replyMock.mockResolvedValue({ text: "root reply" }); diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index 91586f2ba..dcde4bdae 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -3,6 +3,7 @@ import { type SlackCommandMiddlewareArgs, type SlackEventMiddlewareArgs, } from "@slack/bolt"; +import type { WebClient as SlackWebClient } from "@slack/web-api"; import { chunkMarkdownText, resolveTextChunkLimit, @@ -74,6 +75,7 @@ type SlackMessageEvent = { text?: string; ts?: string; thread_ts?: string; + parent_user_id?: string; channel: string; channel_type?: "im" | "mpim" | "channel" | "group"; files?: SlackFile[]; @@ -86,6 +88,7 @@ type SlackAppMentionEvent = { text?: string; ts?: string; thread_ts?: string; + parent_user_id?: string; channel: string; channel_type?: "im" | "mpim" | "channel" | "group"; }; @@ -390,6 +393,44 @@ async function resolveSlackMedia(params: { return null; } +type SlackThreadStarter = { + text: string; + userId?: string; + ts?: string; +}; + +const THREAD_STARTER_CACHE = new Map(); + +async function resolveSlackThreadStarter(params: { + channelId: string; + threadTs: string; + client: SlackWebClient; +}): Promise { + const cacheKey = `${params.channelId}:${params.threadTs}`; + const cached = THREAD_STARTER_CACHE.get(cacheKey); + if (cached) return cached; + try { + const response = (await params.client.conversations.replies({ + channel: params.channelId, + ts: params.threadTs, + limit: 1, + inclusive: true, + })) as { messages?: Array<{ text?: string; user?: string; ts?: string }> }; + const message = response?.messages?.[0]; + const text = (message?.text ?? "").trim(); + if (!message || !text) return null; + const starter: SlackThreadStarter = { + text, + userId: message.user, + ts: message.ts, + }; + THREAD_STARTER_CACHE.set(cacheKey, starter); + return starter; + } catch { + return null; + } +} + export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const cfg = loadConfig(); const sessionCfg = cfg.session; @@ -883,7 +924,16 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { id: isDirectMessage ? (message.user ?? "unknown") : message.channel, }, }); - const sessionKey = route.sessionKey; + const baseSessionKey = route.sessionKey; + const threadTs = message.thread_ts; + const hasThreadTs = typeof threadTs === "string" && threadTs.length > 0; + const isThreadReply = + hasThreadTs && (threadTs !== message.ts || Boolean(message.parent_user_id)); + const threadSessionKey = isThreadReply && threadTs + ? `slack:thread:${message.channel}:${threadTs}` + : undefined; + const parentSessionKey = isThreadReply ? baseSessionKey : undefined; + const sessionKey = threadSessionKey ?? baseSessionKey; enqueueSystemEvent(`${inboundLabel}: ${preview}`, { sessionKey, contextKey: `slack:message:${message.channel}:${message.ts ?? "unknown"}`, @@ -912,11 +962,39 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { ].filter((entry): entry is string => Boolean(entry)); const groupSystemPrompt = systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined; + let threadStarterBody: string | undefined; + let threadLabel: string | undefined; + if (isThreadReply && threadTs) { + const starter = await resolveSlackThreadStarter({ + channelId: message.channel, + threadTs, + client: app.client, + }); + if (starter?.text) { + const starterUser = starter.userId + ? await resolveUserName(starter.userId) + : null; + const starterName = starterUser?.name ?? starter.userId ?? "Unknown"; + const starterWithId = `${starter.text}\n[slack message id: ${starter.ts ?? threadTs} channel: ${message.channel}]`; + threadStarterBody = formatAgentEnvelope({ + provider: "Slack", + from: starterName, + timestamp: starter.ts + ? Math.round(Number(starter.ts) * 1000) + : undefined, + body: starterWithId, + }); + const snippet = starter.text.replace(/\s+/g, " ").slice(0, 80); + threadLabel = `Slack thread ${roomLabel}${snippet ? `: ${snippet}` : ""}`; + } else { + threadLabel = `Slack thread ${roomLabel}`; + } + } const ctxPayload = { Body: body, From: slackFrom, To: slackTo, - SessionKey: route.sessionKey, + SessionKey: sessionKey, AccountId: route.accountId, ChatType: isDirectMessage ? "direct" : isRoom ? "room" : "group", GroupSubject: isRoomish ? roomLabel : undefined, @@ -927,6 +1005,9 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { Surface: "slack" as const, MessageSid: message.ts, ReplyToId: message.thread_ts ?? message.ts, + ParentSessionKey: parentSessionKey, + ThreadStarterBody: threadStarterBody, + ThreadLabel: threadLabel, Timestamp: message.ts ? Math.round(Number(message.ts) * 1000) : undefined, WasMentioned: isRoomish ? wasMentioned : undefined, MediaPath: media?.path,