fix: scope thread sessions and discord starter fetch

This commit is contained in:
Peter Steinberger
2026-01-07 19:42:50 +01:00
parent 7e5cef29a0
commit 0d021391a9
7 changed files with 84 additions and 61 deletions

View File

@@ -33,6 +33,7 @@
- Gateway/CLI: add daemon runtime selection (Node recommended; Bun optional) and document WhatsApp/Baileys Bun WebSocket instability on reconnect.
- CLI: add `clawdbot docs` live docs search with pretty output.
- CLI: add `clawdbot agents` (list/add/delete) with wizarded workspace/setup, provider login, and full prune on delete.
- Discord/Slack: fork thread sessions and inject thread starters for context. Thanks @thewilloftheshadow for PR #400.
- Agent: treat compaction retry AbortError as a fallback trigger without swallowing non-abort errors. Thanks @erikpr1994 for PR #341.
- Agent: add opt-in session pruning for tool results to reduce context bloat. Thanks @maxsumrall for PR #381.
- Agent: protect bootstrap prefix from context pruning. Thanks @maxsumrall for PR #381.

View File

@@ -39,7 +39,7 @@ describe("initSessionState thread forking", () => {
);
const storePath = path.join(root, "sessions.json");
const parentSessionKey = "slack:channel:C1";
const parentSessionKey = "agent:main:slack:channel:C1";
await saveSessionStore(storePath, {
[parentSessionKey]: {
sessionId: parentSessionId,
@@ -52,7 +52,7 @@ describe("initSessionState thread forking", () => {
session: { store: storePath },
} as ClawdbotConfig;
const threadSessionKey = "slack:thread:C1:123";
const threadSessionKey = "agent:main:slack:channel:C1:thread:123";
const threadLabel = "Slack thread #general: starter";
const result = await initSessionState({
ctx: {
@@ -70,7 +70,10 @@ describe("initSessionState thread forking", () => {
expect(result.sessionEntry.sessionFile).toBeTruthy();
expect(result.sessionEntry.displayName).toBe(threadLabel);
const newSessionFile = result.sessionEntry.sessionFile!;
const newSessionFile = result.sessionEntry.sessionFile;
if (!newSessionFile) {
throw new Error("Missing session file for forked thread");
}
const [headerLine] = (await fs.readFile(newSessionFile, "utf-8"))
.split(/\r?\n/)
.filter((line) => line.trim().length > 0);

View File

@@ -35,8 +35,8 @@ import {
DEFAULT_IDLE_MINUTES,
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveSessionKey,
resolveSessionFilePath,
resolveSessionKey,
resolveStorePath,
type SessionEntry,
saveSessionStore,

View File

@@ -168,13 +168,8 @@ describe("discord tool result dispatch", () => {
expect(sendMock).toHaveBeenCalledTimes(1);
}, 10000);
});
it("forks thread sessions and injects starter context", async () => {
const { createDiscordMessageHandler } = await import("./monitor.js");
const { resolveSessionKey } = await import("../config/sessions.js");
vi.mocked(resolveSessionKey).mockReturnValue("discord:parent:p1");
let capturedCtx:
| {
SessionKey?: string;
@@ -239,6 +234,13 @@ describe("discord tool result dispatch", () => {
type: ChannelType.GuildText,
name: "thread-name",
}),
rest: {
get: vi.fn().mockResolvedValue({
content: "starter message",
author: { id: "u1", username: "Alice", discriminator: "0001" },
timestamp: new Date().toISOString(),
}),
},
} as unknown as Client;
await handler(
@@ -265,8 +267,8 @@ describe("discord tool result dispatch", () => {
client,
);
expect(capturedCtx?.SessionKey).toBe("discord:thread:t1");
expect(capturedCtx?.ParentSessionKey).toBe("discord:parent:p1");
expect(capturedCtx?.SessionKey).toBe("agent:main:discord:channel:t1");
expect(capturedCtx?.ParentSessionKey).toBe("agent:main:discord:channel:p1");
expect(capturedCtx?.ThreadStarterBody).toContain("starter message");
expect(capturedCtx?.ThreadLabel).toContain("Discord thread #general");
});

View File

@@ -11,11 +11,6 @@ import {
MessageReactionRemoveListener,
MessageType,
type RequestClient,
type PartialMessage,
type PartialMessageReaction,
Partials,
type ThreadChannel,
type PartialUser,
type User,
} from "@buape/carbon";
import { GatewayIntents, GatewayPlugin } from "@buape/carbon/gateway";
@@ -56,7 +51,10 @@ import {
readProviderAllowFromStore,
upsertProviderPairingRequest,
} from "../pairing/pairing-store.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import {
buildAgentSessionKey,
resolveAgentRoute,
} from "../routing/resolve-route.js";
import type { RuntimeEnv } from "../runtime.js";
import { loadWebMedia } from "../web/media.js";
import { fetchDiscordApplicationId } from "./probe.js";
@@ -86,6 +84,12 @@ type DiscordHistoryEntry = {
};
type DiscordReactionEvent = Parameters<MessageReactionAddListener["handle"]>[0];
type DiscordThreadChannel = {
id: string;
name?: string | null;
parentId?: string | null;
parent?: { id?: string; name?: string };
};
type DiscordThreadStarter = {
text: string;
author: string;
@@ -94,29 +98,46 @@ type DiscordThreadStarter = {
const DISCORD_THREAD_STARTER_CACHE = new Map<string, DiscordThreadStarter>();
async function resolveDiscordThreadStarter(
channel: ThreadChannel,
): Promise<DiscordThreadStarter | null> {
const cacheKey = channel.id;
async function resolveDiscordThreadStarter(params: {
channel: DiscordThreadChannel;
client: Client;
parentId?: string;
}): Promise<DiscordThreadStarter | null> {
const cacheKey = params.channel.id;
const cached = DISCORD_THREAD_STARTER_CACHE.get(cacheKey);
if (cached) return cached;
try {
const starter = await channel.fetchStarterMessage();
if (!params.parentId) return null;
const starter = (await params.client.rest.get(
Routes.channelMessage(params.parentId, params.channel.id),
)) as {
content?: string | null;
embeds?: Array<{ description?: string | null }>;
member?: { nick?: string | null; displayName?: string | null };
author?: {
id?: string | null;
username?: string | null;
discriminator?: string | null;
};
timestamp?: string | null;
};
if (!starter) return null;
const text =
starter.content?.trim() ??
starter.embeds?.[0]?.description?.trim() ??
"";
starter.content?.trim() ?? starter.embeds?.[0]?.description?.trim() ?? "";
if (!text) return null;
const author =
starter.member?.nick ??
starter.member?.displayName ??
starter.author?.tag ??
starter.author?.username ??
"Unknown";
(starter.author
? starter.author.discriminator && starter.author.discriminator !== "0"
? `${starter.author.username ?? "Unknown"}#${starter.author.discriminator}`
: (starter.author.username ?? starter.author.id ?? "Unknown")
: "Unknown");
const timestamp = resolveTimestampMs(starter.timestamp);
const payload: DiscordThreadStarter = {
text,
author,
timestamp: starter.createdTimestamp ?? undefined,
timestamp: timestamp ?? undefined,
};
DISCORD_THREAD_STARTER_CACHE.set(cacheKey, payload);
return payload;
@@ -554,15 +575,18 @@ export function createDiscordMessageHandler(params: {
const channelName =
channelInfo?.name ??
((isGuildMessage || isGroupDm) && "name" in message.channel
((isGuildMessage || isGroupDm) &&
message.channel &&
"name" in message.channel
? message.channel.name
: undefined);
const isThreadChannel =
isGuildMessage &&
message.channel &&
"isThread" in message.channel &&
message.channel.isThread();
const threadChannel = isThreadChannel
? (message.channel as ThreadChannel)
? (message.channel as DiscordThreadChannel)
: null;
const threadParentId =
threadChannel?.parentId ?? threadChannel?.parent?.id ?? undefined;
@@ -576,7 +600,6 @@ export function createDiscordMessageHandler(params: {
const displayChannelSlug = displayChannelName
? normalizeDiscordSlug(displayChannelName)
: "";
const channelSlug = channelName ? normalizeDiscordSlug(channelName) : "";
const guildSlug =
guildInfo?.slug ||
(data.guild?.name ? normalizeDiscordSlug(data.guild.name) : "");
@@ -590,6 +613,7 @@ export function createDiscordMessageHandler(params: {
id: isDirectMessage ? author.id : message.channelId,
},
});
const baseSessionKey = route.sessionKey;
const channelConfig = isGuildMessage
? resolveDiscordChannelConfig({
guildInfo,
@@ -831,13 +855,16 @@ export function createDiscordMessageHandler(params: {
let threadStarterBody: string | undefined;
let threadLabel: string | undefined;
let threadSessionKey: string | undefined;
let parentSessionKey: string | undefined;
if (threadChannel) {
const starter = await resolveDiscordThreadStarter(threadChannel);
const starter = await resolveDiscordThreadStarter({
channel: threadChannel,
client,
parentId: threadParentId,
});
if (starter?.text) {
const starterEnvelope = formatAgentEnvelope({
surface: "Discord",
provider: "Discord",
from: starter.author,
timestamp: starter.timestamp,
body: starter.text,
@@ -848,20 +875,12 @@ export function createDiscordMessageHandler(params: {
threadLabel = threadName
? `Discord thread #${normalizeDiscordSlug(parentName)} ${threadName}`
: `Discord thread #${normalizeDiscordSlug(parentName)}`;
threadSessionKey = `discord:thread:${message.channelId}`;
const sessionCfg = cfg.session;
const sessionScope = sessionCfg?.scope ?? "per-sender";
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
if (threadParentId) {
parentSessionKey = resolveSessionKey(
sessionScope,
{
From: `group:${threadParentId}`,
ChatType: "group",
Surface: "discord",
},
mainKey,
);
parentSessionKey = buildAgentSessionKey({
agentId: route.agentId,
provider: route.provider,
peer: { kind: "channel", id: threadParentId },
});
}
}
const mediaPayload = buildDiscordMediaPayload(mediaList);
@@ -872,7 +891,7 @@ export function createDiscordMessageHandler(params: {
? `discord:${author.id}`
: `group:${message.channelId}`,
To: discordTo,
SessionKey: threadSessionKey ?? route.sessionKey,
SessionKey: baseSessionKey,
AccountId: route.accountId,
ChatType: isDirectMessage ? "direct" : "group",
SenderName:

View File

@@ -285,8 +285,6 @@ describe("monitorSlackProvider tool results", () => {
});
it("treats parent_user_id as a thread reply even when thread_ts matches ts", async () => {
const { resolveSessionKey } = await import("../config/sessions.js");
vi.mocked(resolveSessionKey).mockReturnValue("main");
replyMock.mockResolvedValue({ text: "thread reply" });
const controller = new AbortController();
@@ -322,13 +320,11 @@ describe("monitorSlackProvider tool results", () => {
SessionKey?: string;
ParentSessionKey?: string;
};
expect(ctx.SessionKey).toBe("slack:thread:C1:123");
expect(ctx.ParentSessionKey).toBe("main");
expect(ctx.SessionKey).toBe("agent:main:main:thread:123");
expect(ctx.ParentSessionKey).toBe("agent:main:main");
});
it("forks thread sessions and injects starter context", async () => {
const { resolveSessionKey } = await import("../config/sessions.js");
vi.mocked(resolveSessionKey).mockReturnValue("slack:channel:C1");
replyMock.mockResolvedValue({ text: "ok" });
const client = getSlackClient();
@@ -386,8 +382,8 @@ describe("monitorSlackProvider tool results", () => {
ThreadStarterBody?: string;
ThreadLabel?: string;
};
expect(ctx.SessionKey).toBe("slack:thread:C1:111.222");
expect(ctx.ParentSessionKey).toBe("slack:channel:C1");
expect(ctx.SessionKey).toBe("agent:main:slack:channel:C1:thread:111.222");
expect(ctx.ParentSessionKey).toBe("agent:main:slack:channel:C1");
expect(ctx.ThreadStarterBody).toContain("starter message");
expect(ctx.ThreadLabel).toContain("Slack thread #general");
});

View File

@@ -928,10 +928,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const threadTs = message.thread_ts;
const hasThreadTs = typeof threadTs === "string" && threadTs.length > 0;
const isThreadReply =
hasThreadTs && (threadTs !== message.ts || Boolean(message.parent_user_id));
const threadSessionKey = isThreadReply && threadTs
? `slack:thread:${message.channel}:${threadTs}`
: undefined;
hasThreadTs &&
(threadTs !== message.ts || Boolean(message.parent_user_id));
const threadSessionKey =
isThreadReply && threadTs
? `${baseSessionKey}:thread:${threadTs}`
: undefined;
const parentSessionKey = isThreadReply ? baseSessionKey : undefined;
const sessionKey = threadSessionKey ?? baseSessionKey;
enqueueSystemEvent(`${inboundLabel}: ${preview}`, {