Threads: add Slack/Discord thread sessions

This commit is contained in:
Shadow
2026-01-07 09:02:20 -06:00
committed by Peter Steinberger
parent 422477499c
commit 7e5cef29a0
17 changed files with 670 additions and 27 deletions

View File

@@ -25,7 +25,7 @@ import {
type ClawdbotConfig,
loadConfig,
} from "../config/config.js";
import { resolveSessionTranscriptPath } from "../config/sessions.js";
import { resolveSessionFilePath } from "../config/sessions.js";
import { logVerbose } from "../globals.js";
import { clearCommandLane, getQueueSize } from "../process/command-queue.js";
import { defaultRuntime } from "../runtime.js";
@@ -646,6 +646,11 @@ export async function getReplyFromConfig(
isNewSession,
prefixedBodyBase,
});
const threadStarterBody = ctx.ThreadStarterBody?.trim();
const threadStarterNote =
isNewSession && threadStarterBody
? `[Thread starter - for context]\n${threadStarterBody}`
: undefined;
const skillResult = await ensureSkillSnapshot({
sessionEntry,
sessionStore,
@@ -661,10 +666,10 @@ export async function getReplyFromConfig(
systemSent = skillResult.systemSent;
const skillsSnapshot = skillResult.skillsSnapshot;
const prefixedBody = transcribedText
? [prefixedBodyBase, `Transcript:\n${transcribedText}`]
? [threadStarterNote, prefixedBodyBase, `Transcript:\n${transcribedText}`]
.filter(Boolean)
.join("\n\n")
: prefixedBodyBase;
: [threadStarterNote, prefixedBodyBase].filter(Boolean).join("\n\n");
const mediaNote = ctx.MediaPath?.length
? `[media attached: ${ctx.MediaPath}${ctx.MediaType ? ` (${ctx.MediaType})` : ""}${ctx.MediaUrl ? ` | ${ctx.MediaUrl}` : ""}]`
: undefined;
@@ -689,12 +694,12 @@ export async function getReplyFromConfig(
resolvedThinkLevel = await modelState.resolveDefaultThinkingLevel();
}
const sessionIdFinal = sessionId ?? crypto.randomUUID();
const sessionFile = resolveSessionTranscriptPath(sessionIdFinal);
const sessionFile = resolveSessionFilePath(sessionIdFinal, sessionEntry);
const queueBodyBase = transcribedText
? [baseBodyFinal, `Transcript:\n${transcribedText}`]
? [threadStarterNote, baseBodyFinal, `Transcript:\n${transcribedText}`]
.filter(Boolean)
.join("\n\n")
: baseBodyFinal;
: [threadStarterNote, baseBodyFinal].filter(Boolean).join("\n\n");
const queuedBody = mediaNote
? [mediaNote, mediaReplyHint, queueBodyBase]
.filter(Boolean)

View File

@@ -14,7 +14,7 @@ import {
} from "../../agents/pi-embedded.js";
import type { ClawdbotConfig } from "../../config/config.js";
import {
resolveSessionTranscriptPath,
resolveSessionFilePath,
type SessionEntry,
type SessionScope,
saveSessionStore,
@@ -509,7 +509,7 @@ export async function handleCommands(params: {
sessionId,
sessionKey,
messageProvider: command.provider,
sessionFile: resolveSessionTranscriptPath(sessionId),
sessionFile: resolveSessionFilePath(sessionId, sessionEntry),
workspaceDir,
config: cfg,
skillsSnapshot: sessionEntry.skillsSnapshot,

View File

@@ -0,0 +1,82 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import type { ClawdbotConfig } from "../../config/config.js";
import { saveSessionStore } from "../../config/sessions.js";
import { initSessionState } from "./session.js";
describe("initSessionState thread forking", () => {
it("forks a new session from the parent session file", async () => {
const root = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdbot-thread-session-"),
);
const sessionsDir = path.join(root, "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
const parentSessionId = "parent-session";
const parentSessionFile = path.join(sessionsDir, "parent.jsonl");
const header = {
type: "session",
version: 3,
id: parentSessionId,
timestamp: new Date().toISOString(),
cwd: process.cwd(),
};
const message = {
type: "message",
id: "m1",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "Parent prompt" },
};
await fs.writeFile(
parentSessionFile,
`${JSON.stringify(header)}\n${JSON.stringify(message)}\n`,
"utf-8",
);
const storePath = path.join(root, "sessions.json");
const parentSessionKey = "slack:channel:C1";
await saveSessionStore(storePath, {
[parentSessionKey]: {
sessionId: parentSessionId,
sessionFile: parentSessionFile,
updatedAt: Date.now(),
},
});
const cfg = {
session: { store: storePath },
} as ClawdbotConfig;
const threadSessionKey = "slack:thread:C1:123";
const threadLabel = "Slack thread #general: starter";
const result = await initSessionState({
ctx: {
Body: "Thread reply",
SessionKey: threadSessionKey,
ParentSessionKey: parentSessionKey,
ThreadLabel: threadLabel,
},
cfg,
commandAuthorized: true,
});
expect(result.sessionKey).toBe(threadSessionKey);
expect(result.sessionEntry.sessionId).not.toBe(parentSessionId);
expect(result.sessionEntry.sessionFile).toBeTruthy();
expect(result.sessionEntry.displayName).toBe(threadLabel);
const newSessionFile = result.sessionEntry.sessionFile!;
const [headerLine] = (await fs.readFile(newSessionFile, "utf-8"))
.split(/\r?\n/)
.filter((line) => line.trim().length > 0);
const parsedHeader = JSON.parse(headerLine) as {
parentSession?: string;
};
expect(parsedHeader.parentSession).toBe(parentSessionFile);
});
});

View File

@@ -1,5 +1,11 @@
import crypto from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import {
CURRENT_SESSION_VERSION,
SessionManager,
} from "@mariozechner/pi-coding-agent";
import type { ClawdbotConfig } from "../../config/config.js";
import {
buildGroupDisplayName,
@@ -9,6 +15,7 @@ import {
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveGroupSessionKey,
resolveSessionFilePath,
resolveSessionKey,
resolveStorePath,
type SessionEntry,
@@ -36,6 +43,45 @@ export type SessionInitResult = {
triggerBodyNormalized: string;
};
function forkSessionFromParent(params: {
parentEntry: SessionEntry;
}): { sessionId: string; sessionFile: string } | null {
const parentSessionFile = resolveSessionFilePath(
params.parentEntry.sessionId,
params.parentEntry,
);
if (!parentSessionFile || !fs.existsSync(parentSessionFile)) return null;
try {
const manager = SessionManager.open(parentSessionFile);
const leafId = manager.getLeafId();
if (leafId) {
const sessionFile =
manager.createBranchedSession(leafId) ?? manager.getSessionFile();
const sessionId = manager.getSessionId();
if (sessionFile && sessionId) return { sessionId, sessionFile };
}
const sessionId = crypto.randomUUID();
const timestamp = new Date().toISOString();
const fileTimestamp = timestamp.replace(/[:.]/g, "-");
const sessionFile = path.join(
manager.getSessionDir(),
`${fileTimestamp}_${sessionId}.jsonl`,
);
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: sessionId,
timestamp,
cwd: manager.getCwd(),
parentSession: parentSessionFile,
};
fs.writeFileSync(sessionFile, `${JSON.stringify(header)}\n`, "utf-8");
return { sessionId, sessionFile };
} catch {
return null;
}
}
export async function initSessionState(params: {
ctx: MsgContext;
cfg: ClawdbotConfig;
@@ -189,6 +235,26 @@ export async function initSessionState(params: {
} else if (!sessionEntry.chatType) {
sessionEntry.chatType = "direct";
}
const threadLabel = ctx.ThreadLabel?.trim();
if (threadLabel) {
sessionEntry.displayName = threadLabel;
}
const parentSessionKey = ctx.ParentSessionKey?.trim();
if (
isNewSession &&
parentSessionKey &&
parentSessionKey !== sessionKey &&
sessionStore[parentSessionKey]
) {
const forked = forkSessionFromParent({
parentEntry: sessionStore[parentSessionKey],
});
if (forked) {
sessionId = forked.sessionId;
sessionEntry.sessionId = forked.sessionId;
sessionEntry.sessionFile = forked.sessionFile;
}
}
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);

View File

@@ -16,7 +16,7 @@ import {
import type { ClawdbotConfig } from "../config/config.js";
import {
resolveMainSessionKey,
resolveSessionTranscriptPath,
resolveSessionFilePath,
type SessionEntry,
type SessionScope,
} from "../config/sessions.js";
@@ -185,6 +185,7 @@ const formatQueueDetails = (queue?: QueueStatus) => {
const readUsageFromSessionLog = (
sessionId?: string,
sessionEntry?: SessionEntry,
):
| {
input: number;
@@ -194,9 +195,9 @@ const readUsageFromSessionLog = (
model?: string;
}
| undefined => {
// Transcripts always live at: ~/.clawdbot/sessions/<SessionId>.jsonl
// Transcripts are stored at the session file path (fallback: ~/.clawdbot/sessions/<SessionId>.jsonl)
if (!sessionId) return undefined;
const logPath = resolveSessionTranscriptPath(sessionId);
const logPath = resolveSessionFilePath(sessionId, sessionEntry);
if (!fs.existsSync(logPath)) return undefined;
try {
@@ -264,7 +265,7 @@ export function buildStatusMessage(args: StatusArgs): string {
// Prefer prompt-size tokens from the session transcript when it looks larger
// (cached prompt tokens are often missing from agent meta/store).
if (args.includeTranscriptUsage) {
const logUsage = readUsageFromSessionLog(entry?.sessionId);
const logUsage = readUsageFromSessionLog(entry?.sessionId, entry);
if (logUsage) {
const candidate = logUsage.promptTokens || logUsage.total;
if (!totalTokens || totalTokens === 0 || candidate > totalTokens) {

View File

@@ -15,10 +15,13 @@ export type MsgContext = {
SessionKey?: string;
/** Provider account id (multi-account). */
AccountId?: string;
ParentSessionKey?: string;
MessageSid?: string;
ReplyToId?: string;
ReplyToBody?: string;
ReplyToSender?: string;
ThreadStarterBody?: string;
ThreadLabel?: string;
MediaPath?: string;
MediaUrl?: string;
MediaType?: string;

View File

@@ -36,7 +36,7 @@ import {
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveSessionKey,
resolveSessionTranscriptPath,
resolveSessionFilePath,
resolveStorePath,
type SessionEntry,
saveSessionStore,
@@ -386,7 +386,7 @@ export async function agentCommand(
catalog: catalogForThinking,
});
}
const sessionFile = resolveSessionTranscriptPath(sessionId);
const sessionFile = resolveSessionFilePath(sessionId, sessionEntry);
const startedAt = Date.now();
let lifecycleEnded = false;

View File

@@ -33,6 +33,7 @@ export type SessionChatType = "direct" | "group" | "room";
export type SessionEntry = {
sessionId: string;
updatedAt: number;
sessionFile?: string;
/** Parent session key that spawned this session (used for sandbox session-tool scoping). */
spawnedBy?: string;
systemSent?: boolean;
@@ -137,6 +138,17 @@ export function resolveSessionTranscriptPath(
return path.join(resolveAgentSessionsDir(agentId), `${sessionId}.jsonl`);
}
export function resolveSessionFilePath(
sessionId: string,
entry?: SessionEntry,
opts?: { agentId?: string },
): string {
const candidate = entry?.sessionFile?.trim();
return candidate
? candidate
: resolveSessionTranscriptPath(sessionId, opts?.agentId);
}
export function resolveStorePath(store?: string, opts?: { agentId?: string }) {
const agentId = normalizeAgentId(opts?.agentId ?? DEFAULT_AGENT_ID);
if (!store) return resolveDefaultSessionStorePath(agentId);
@@ -393,6 +405,7 @@ export async function updateLastRoute(params: {
const next: SessionEntry = {
sessionId: existing?.sessionId ?? crypto.randomUUID(),
updatedAt: Math.max(existing?.updatedAt ?? 0, now),
sessionFile: existing?.sessionFile,
systemSent: existing?.systemSent,
abortedLastRun: existing?.abortedLastRun,
thinkingLevel: existing?.thinkingLevel,

View File

@@ -167,4 +167,107 @@ describe("discord tool result dispatch", () => {
expect(dispatchMock).toHaveBeenCalledTimes(1);
expect(sendMock).toHaveBeenCalledTimes(1);
}, 10000);
});
it("forks thread sessions and injects starter context", async () => {
const { createDiscordMessageHandler } = await import("./monitor.js");
const { resolveSessionKey } = await import("../config/sessions.js");
vi.mocked(resolveSessionKey).mockReturnValue("discord:parent:p1");
let capturedCtx:
| {
SessionKey?: string;
ParentSessionKey?: string;
ThreadStarterBody?: string;
ThreadLabel?: string;
}
| undefined;
dispatchMock.mockImplementationOnce(async ({ ctx, dispatcher }) => {
capturedCtx = ctx;
dispatcher.sendFinalReply({ text: "hi" });
return { queuedFinal: true, counts: { final: 1 } };
});
const cfg = {
agent: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/clawd" },
session: { store: "/tmp/clawdbot-sessions.json" },
messages: { responsePrefix: "PFX" },
discord: {
dm: { enabled: true, policy: "open" },
guilds: { "*": { requireMention: false } },
},
routing: { allowFrom: [] },
} as ReturnType<typeof import("../config/config.js").loadConfig>;
const handler = createDiscordMessageHandler({
cfg,
token: "token",
runtime: {
log: vi.fn(),
error: vi.fn(),
exit: (code: number): never => {
throw new Error(`exit ${code}`);
},
},
botUserId: "bot-id",
guildHistories: new Map(),
historyLimit: 0,
mediaMaxBytes: 10_000,
textLimit: 2000,
replyToMode: "off",
dmEnabled: true,
groupDmEnabled: false,
guildEntries: { "*": { requireMention: false } },
});
const threadChannel = {
type: ChannelType.GuildText,
name: "thread-name",
parentId: "p1",
parent: { id: "p1", name: "general" },
isThread: () => true,
fetchStarterMessage: async () => ({
content: "starter message",
author: { tag: "Alice#1", username: "Alice" },
createdTimestamp: Date.now(),
}),
};
const client = {
fetchChannel: vi.fn().mockResolvedValue({
type: ChannelType.GuildText,
name: "thread-name",
}),
} as unknown as Client;
await handler(
{
message: {
id: "m4",
content: "thread reply",
channelId: "t1",
channel: threadChannel,
timestamp: new Date().toISOString(),
type: MessageType.Default,
attachments: [],
embeds: [],
mentionedEveryone: false,
mentionedUsers: [],
mentionedRoles: [],
author: { id: "u2", bot: false, username: "Bob", tag: "Bob#2" },
},
author: { id: "u2", bot: false, username: "Bob", tag: "Bob#2" },
member: { displayName: "Bob" },
guild: { id: "g1", name: "Guild" },
guild_id: "g1",
},
client,
);
expect(capturedCtx?.SessionKey).toBe("discord:thread:t1");
expect(capturedCtx?.ParentSessionKey).toBe("discord:parent:p1");
expect(capturedCtx?.ThreadStarterBody).toContain("starter message");
expect(capturedCtx?.ThreadLabel).toContain("Discord thread #general");
});
});

View File

@@ -11,6 +11,11 @@ import {
MessageReactionRemoveListener,
MessageType,
type RequestClient,
type PartialMessage,
type PartialMessageReaction,
Partials,
type ThreadChannel,
type PartialUser,
type User,
} from "@buape/carbon";
import { GatewayIntents, GatewayPlugin } from "@buape/carbon/gateway";
@@ -81,6 +86,44 @@ type DiscordHistoryEntry = {
};
type DiscordReactionEvent = Parameters<MessageReactionAddListener["handle"]>[0];
type DiscordThreadStarter = {
text: string;
author: string;
timestamp?: number;
};
const DISCORD_THREAD_STARTER_CACHE = new Map<string, DiscordThreadStarter>();
async function resolveDiscordThreadStarter(
channel: ThreadChannel,
): Promise<DiscordThreadStarter | null> {
const cacheKey = channel.id;
const cached = DISCORD_THREAD_STARTER_CACHE.get(cacheKey);
if (cached) return cached;
try {
const starter = await channel.fetchStarterMessage();
if (!starter) return null;
const text =
starter.content?.trim() ??
starter.embeds?.[0]?.description?.trim() ??
"";
if (!text) return null;
const author =
starter.member?.displayName ??
starter.author?.tag ??
starter.author?.username ??
"Unknown";
const payload: DiscordThreadStarter = {
text,
author,
timestamp: starter.createdTimestamp ?? undefined,
};
DISCORD_THREAD_STARTER_CACHE.set(cacheKey, payload);
return payload;
} catch {
return null;
}
}
export type DiscordAllowList = {
allowAll: boolean;
@@ -509,7 +552,30 @@ export function createDiscordMessageHandler(params: {
return;
}
const channelName = channelInfo?.name;
const channelName =
channelInfo?.name ??
((isGuildMessage || isGroupDm) && "name" in message.channel
? message.channel.name
: undefined);
const isThreadChannel =
isGuildMessage &&
"isThread" in message.channel &&
message.channel.isThread();
const threadChannel = isThreadChannel
? (message.channel as ThreadChannel)
: null;
const threadParentId =
threadChannel?.parentId ?? threadChannel?.parent?.id ?? undefined;
const threadParentName = threadChannel?.parent?.name;
const threadName = threadChannel?.name;
const configChannelName = threadParentName ?? channelName;
const configChannelSlug = configChannelName
? normalizeDiscordSlug(configChannelName)
: "";
const displayChannelName = threadName ?? channelName;
const displayChannelSlug = displayChannelName
? normalizeDiscordSlug(displayChannelName)
: "";
const channelSlug = channelName ? normalizeDiscordSlug(channelName) : "";
const guildSlug =
guildInfo?.slug ||
@@ -527,9 +593,9 @@ export function createDiscordMessageHandler(params: {
const channelConfig = isGuildMessage
? resolveDiscordChannelConfig({
guildInfo,
channelId: message.channelId,
channelName,
channelSlug,
channelId: threadParentId ?? message.channelId,
channelName: configChannelName,
channelSlug: configChannelSlug,
})
: null;
if (isGuildMessage && channelConfig?.enabled === false) {
@@ -544,8 +610,8 @@ export function createDiscordMessageHandler(params: {
resolveGroupDmAllow({
channels: groupDmChannels,
channelId: message.channelId,
channelName,
channelSlug,
channelName: displayChannelName,
channelSlug: displayChannelSlug,
});
if (isGroupDm && !groupDmAllowed) return;
@@ -715,7 +781,9 @@ export function createDiscordMessageHandler(params: {
channelId: message.channelId,
});
const groupRoom =
isGuildMessage && channelSlug ? `#${channelSlug}` : undefined;
isGuildMessage && displayChannelSlug
? `#${displayChannelSlug}`
: undefined;
const groupSubject = isDirectMessage ? undefined : groupRoom;
const channelDescription = channelInfo?.topic?.trim();
const systemPromptParts = [
@@ -761,6 +829,41 @@ export function createDiscordMessageHandler(params: {
combinedBody = `[Replied message - for context]\n${replyContext}\n\n${combinedBody}`;
}
let threadStarterBody: string | undefined;
let threadLabel: string | undefined;
let threadSessionKey: string | undefined;
let parentSessionKey: string | undefined;
if (threadChannel) {
const starter = await resolveDiscordThreadStarter(threadChannel);
if (starter?.text) {
const starterEnvelope = formatAgentEnvelope({
surface: "Discord",
from: starter.author,
timestamp: starter.timestamp,
body: starter.text,
});
threadStarterBody = starterEnvelope;
}
const parentName = threadParentName ?? "parent";
threadLabel = threadName
? `Discord thread #${normalizeDiscordSlug(parentName)} ${threadName}`
: `Discord thread #${normalizeDiscordSlug(parentName)}`;
threadSessionKey = `discord:thread:${message.channelId}`;
const sessionCfg = cfg.session;
const sessionScope = sessionCfg?.scope ?? "per-sender";
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
if (threadParentId) {
parentSessionKey = resolveSessionKey(
sessionScope,
{
From: `group:${threadParentId}`,
ChatType: "group",
Surface: "discord",
},
mainKey,
);
}
}
const mediaPayload = buildDiscordMediaPayload(mediaList);
const discordTo = `channel:${message.channelId}`;
const ctxPayload = {
@@ -769,7 +872,7 @@ export function createDiscordMessageHandler(params: {
? `discord:${author.id}`
: `group:${message.channelId}`,
To: discordTo,
SessionKey: route.sessionKey,
SessionKey: threadSessionKey ?? route.sessionKey,
AccountId: route.accountId,
ChatType: isDirectMessage ? "direct" : "group",
SenderName:
@@ -787,6 +890,9 @@ export function createDiscordMessageHandler(params: {
Surface: "discord" as const,
WasMentioned: wasMentioned,
MessageSid: message.id,
ParentSessionKey: parentSessionKey,
ThreadStarterBody: threadStarterBody,
ThreadLabel: threadLabel,
Timestamp: resolveTimestampMs(message.timestamp),
...mediaPayload,
CommandAuthorized: commandAuthorized,

View File

@@ -707,6 +707,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
for (const candidate of resolveSessionTranscriptCandidates(
sessionId,
storePath,
entry?.sessionFile,
)) {
if (!fs.existsSync(candidate)) continue;
try {
@@ -773,6 +774,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
const filePath = resolveSessionTranscriptCandidates(
sessionId,
storePath,
entry?.sessionFile,
).find((candidate) => fs.existsSync(candidate));
if (!filePath) {
return {
@@ -843,7 +845,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
const sessionId = entry?.sessionId;
const rawMessages =
sessionId && storePath
? readSessionMessages(sessionId, storePath)
? readSessionMessages(sessionId, storePath, entry?.sessionFile)
: [];
const max = typeof limit === "number" ? limit : 200;
const sliced =

View File

@@ -46,7 +46,9 @@ export const chatHandlers: GatewayRequestHandlers = {
const { cfg, storePath, entry } = loadSessionEntry(sessionKey);
const sessionId = entry?.sessionId;
const rawMessages =
sessionId && storePath ? readSessionMessages(sessionId, storePath) : [];
sessionId && storePath
? readSessionMessages(sessionId, storePath, entry?.sessionFile)
: [];
const hardMax = 1000;
const defaultLimit = 200;
const requested = typeof limit === "number" ? limit : defaultLimit;

View File

@@ -485,6 +485,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
for (const candidate of resolveSessionTranscriptCandidates(
sessionId,
storePath,
entry?.sessionFile,
target.agentId,
)) {
if (!fs.existsSync(candidate)) continue;
@@ -559,6 +560,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const filePath = resolveSessionTranscriptCandidates(
sessionId,
storePath,
entry?.sessionFile,
target.agentId,
).find((candidate) => fs.existsSync(candidate));
if (!filePath) {

View File

@@ -327,6 +327,67 @@ describe("gateway server chat", () => {
await server.close();
});
test("chat.history prefers sessionFile when set", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
const forkedPath = path.join(dir, "sess-forked.jsonl");
await fs.writeFile(
forkedPath,
JSON.stringify({
message: {
role: "user",
content: [{ type: "text", text: "from-fork" }],
timestamp: Date.now(),
},
}),
"utf-8",
);
await fs.writeFile(
path.join(dir, "sess-main.jsonl"),
JSON.stringify({
message: {
role: "user",
content: [{ type: "text", text: "from-default" }],
timestamp: Date.now(),
},
}),
"utf-8",
);
await fs.writeFile(
testState.sessionStorePath,
JSON.stringify(
{
main: {
sessionId: "sess-main",
sessionFile: forkedPath,
updatedAt: Date.now(),
},
},
null,
2,
),
"utf-8",
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", {
sessionKey: "main",
});
expect(res.ok).toBe(true);
const messages = res.payload?.messages ?? [];
expect(messages.length).toBe(1);
const first = messages[0] as { content?: { text?: string }[] };
expect(first.content?.[0]?.text).toBe("from-fork");
ws.close();
await server.close();
});
test("chat.history defaults thinking to low for reasoning-capable models", async () => {
piSdkMock.enabled = true;
piSdkMock.models = [

View File

@@ -74,8 +74,13 @@ export type SessionsPatchResult = {
export function readSessionMessages(
sessionId: string,
storePath: string | undefined,
sessionFile?: string,
): unknown[] {
const candidates = resolveSessionTranscriptCandidates(sessionId, storePath);
const candidates = resolveSessionTranscriptCandidates(
sessionId,
storePath,
sessionFile,
);
const filePath = candidates.find((p) => fs.existsSync(p));
if (!filePath) return [];
@@ -99,9 +104,11 @@ export function readSessionMessages(
export function resolveSessionTranscriptCandidates(
sessionId: string,
storePath: string | undefined,
sessionFile?: string,
agentId?: string,
): string[] {
const candidates: string[] = [];
if (sessionFile) candidates.push(sessionFile);
if (storePath) {
const dir = path.dirname(storePath);
candidates.push(path.join(dir, `${sessionId}.jsonl`));

View File

@@ -57,6 +57,7 @@ vi.mock("@slack/bolt", () => {
info: vi.fn().mockResolvedValue({
channel: { name: "dm", is_im: true },
}),
replies: vi.fn().mockResolvedValue({ messages: [] }),
},
users: {
info: vi.fn().mockResolvedValue({
@@ -283,6 +284,114 @@ describe("monitorSlackProvider tool results", () => {
expect(sendMock.mock.calls[0][2]).toMatchObject({ threadTs: "456" });
});
it("treats parent_user_id as a thread reply even when thread_ts matches ts", async () => {
const { resolveSessionKey } = await import("../config/sessions.js");
vi.mocked(resolveSessionKey).mockReturnValue("main");
replyMock.mockResolvedValue({ text: "thread reply" });
const controller = new AbortController();
const run = monitorSlackProvider({
botToken: "bot-token",
appToken: "app-token",
abortSignal: controller.signal,
});
await waitForEvent("message");
const handler = getSlackHandlers()?.get("message");
if (!handler) throw new Error("Slack message handler not registered");
await handler({
event: {
type: "message",
user: "U1",
text: "hello",
ts: "123",
thread_ts: "123",
parent_user_id: "U2",
channel: "C1",
channel_type: "im",
},
});
await flush();
controller.abort();
await run;
expect(replyMock).toHaveBeenCalledTimes(1);
const ctx = replyMock.mock.calls[0]?.[0] as {
SessionKey?: string;
ParentSessionKey?: string;
};
expect(ctx.SessionKey).toBe("slack:thread:C1:123");
expect(ctx.ParentSessionKey).toBe("main");
});
it("forks thread sessions and injects starter context", async () => {
const { resolveSessionKey } = await import("../config/sessions.js");
vi.mocked(resolveSessionKey).mockReturnValue("slack:channel:C1");
replyMock.mockResolvedValue({ text: "ok" });
const client = getSlackClient();
if (client?.conversations?.info) {
client.conversations.info.mockResolvedValue({
channel: { name: "general", is_channel: true },
});
}
if (client?.conversations?.replies) {
client.conversations.replies.mockResolvedValue({
messages: [{ text: "starter message", user: "U2", ts: "111.222" }],
});
}
config = {
messages: { responsePrefix: "PFX" },
slack: {
dm: { enabled: true, policy: "open", allowFrom: ["*"] },
channels: { C1: { allow: true, requireMention: false } },
},
routing: { allowFrom: [] },
};
const controller = new AbortController();
const run = monitorSlackProvider({
botToken: "bot-token",
appToken: "app-token",
abortSignal: controller.signal,
});
await waitForEvent("message");
const handler = getSlackHandlers()?.get("message");
if (!handler) throw new Error("Slack message handler not registered");
await handler({
event: {
type: "message",
user: "U1",
text: "thread reply",
ts: "123.456",
thread_ts: "111.222",
channel: "C1",
channel_type: "channel",
},
});
await flush();
controller.abort();
await run;
expect(replyMock).toHaveBeenCalledTimes(1);
const ctx = replyMock.mock.calls[0]?.[0] as {
SessionKey?: string;
ParentSessionKey?: string;
ThreadStarterBody?: string;
ThreadLabel?: string;
};
expect(ctx.SessionKey).toBe("slack:thread:C1:111.222");
expect(ctx.ParentSessionKey).toBe("slack:channel:C1");
expect(ctx.ThreadStarterBody).toContain("starter message");
expect(ctx.ThreadLabel).toContain("Slack thread #general");
});
it("keeps replies in channel root when message is not threaded", async () => {
replyMock.mockResolvedValue({ text: "root reply" });

View File

@@ -3,6 +3,7 @@ import {
type SlackCommandMiddlewareArgs,
type SlackEventMiddlewareArgs,
} from "@slack/bolt";
import type { WebClient as SlackWebClient } from "@slack/web-api";
import {
chunkMarkdownText,
resolveTextChunkLimit,
@@ -74,6 +75,7 @@ type SlackMessageEvent = {
text?: string;
ts?: string;
thread_ts?: string;
parent_user_id?: string;
channel: string;
channel_type?: "im" | "mpim" | "channel" | "group";
files?: SlackFile[];
@@ -86,6 +88,7 @@ type SlackAppMentionEvent = {
text?: string;
ts?: string;
thread_ts?: string;
parent_user_id?: string;
channel: string;
channel_type?: "im" | "mpim" | "channel" | "group";
};
@@ -390,6 +393,44 @@ async function resolveSlackMedia(params: {
return null;
}
type SlackThreadStarter = {
text: string;
userId?: string;
ts?: string;
};
const THREAD_STARTER_CACHE = new Map<string, SlackThreadStarter>();
async function resolveSlackThreadStarter(params: {
channelId: string;
threadTs: string;
client: SlackWebClient;
}): Promise<SlackThreadStarter | null> {
const cacheKey = `${params.channelId}:${params.threadTs}`;
const cached = THREAD_STARTER_CACHE.get(cacheKey);
if (cached) return cached;
try {
const response = (await params.client.conversations.replies({
channel: params.channelId,
ts: params.threadTs,
limit: 1,
inclusive: true,
})) as { messages?: Array<{ text?: string; user?: string; ts?: string }> };
const message = response?.messages?.[0];
const text = (message?.text ?? "").trim();
if (!message || !text) return null;
const starter: SlackThreadStarter = {
text,
userId: message.user,
ts: message.ts,
};
THREAD_STARTER_CACHE.set(cacheKey, starter);
return starter;
} catch {
return null;
}
}
export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const cfg = loadConfig();
const sessionCfg = cfg.session;
@@ -883,7 +924,16 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
id: isDirectMessage ? (message.user ?? "unknown") : message.channel,
},
});
const sessionKey = route.sessionKey;
const baseSessionKey = route.sessionKey;
const threadTs = message.thread_ts;
const hasThreadTs = typeof threadTs === "string" && threadTs.length > 0;
const isThreadReply =
hasThreadTs && (threadTs !== message.ts || Boolean(message.parent_user_id));
const threadSessionKey = isThreadReply && threadTs
? `slack:thread:${message.channel}:${threadTs}`
: undefined;
const parentSessionKey = isThreadReply ? baseSessionKey : undefined;
const sessionKey = threadSessionKey ?? baseSessionKey;
enqueueSystemEvent(`${inboundLabel}: ${preview}`, {
sessionKey,
contextKey: `slack:message:${message.channel}:${message.ts ?? "unknown"}`,
@@ -912,11 +962,39 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
].filter((entry): entry is string => Boolean(entry));
const groupSystemPrompt =
systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined;
let threadStarterBody: string | undefined;
let threadLabel: string | undefined;
if (isThreadReply && threadTs) {
const starter = await resolveSlackThreadStarter({
channelId: message.channel,
threadTs,
client: app.client,
});
if (starter?.text) {
const starterUser = starter.userId
? await resolveUserName(starter.userId)
: null;
const starterName = starterUser?.name ?? starter.userId ?? "Unknown";
const starterWithId = `${starter.text}\n[slack message id: ${starter.ts ?? threadTs} channel: ${message.channel}]`;
threadStarterBody = formatAgentEnvelope({
provider: "Slack",
from: starterName,
timestamp: starter.ts
? Math.round(Number(starter.ts) * 1000)
: undefined,
body: starterWithId,
});
const snippet = starter.text.replace(/\s+/g, " ").slice(0, 80);
threadLabel = `Slack thread ${roomLabel}${snippet ? `: ${snippet}` : ""}`;
} else {
threadLabel = `Slack thread ${roomLabel}`;
}
}
const ctxPayload = {
Body: body,
From: slackFrom,
To: slackTo,
SessionKey: route.sessionKey,
SessionKey: sessionKey,
AccountId: route.accountId,
ChatType: isDirectMessage ? "direct" : isRoom ? "room" : "group",
GroupSubject: isRoomish ? roomLabel : undefined,
@@ -927,6 +1005,9 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
Surface: "slack" as const,
MessageSid: message.ts,
ReplyToId: message.thread_ts ?? message.ts,
ParentSessionKey: parentSessionKey,
ThreadStarterBody: threadStarterBody,
ThreadLabel: threadLabel,
Timestamp: message.ts ? Math.round(Number(message.ts) * 1000) : undefined,
WasMentioned: isRoomish ? wasMentioned : undefined,
MediaPath: media?.path,