fix: resolve heartbeat sender and Slack thread_ts

This commit is contained in:
Peter Steinberger
2026-01-23 02:05:34 +00:00
parent 712bc74c30
commit 4355d9acca
7 changed files with 524 additions and 45 deletions

View File

@@ -0,0 +1,100 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import * as replyModule from "../auto-reply/reply.js";
import type { ClawdbotConfig } from "../config/config.js";
import { resolveMainSessionKey } from "../config/sessions.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createPluginRuntime } from "../plugins/runtime/index.js";
import { createTestRegistry } from "../test-utils/channel-plugins.js";
import { slackPlugin } from "../../extensions/slack/src/channel.js";
import { setSlackRuntime } from "../../extensions/slack/src/runtime.js";
import { telegramPlugin } from "../../extensions/telegram/src/channel.js";
import { setTelegramRuntime } from "../../extensions/telegram/src/runtime.js";
import { whatsappPlugin } from "../../extensions/whatsapp/src/channel.js";
import { setWhatsAppRuntime } from "../../extensions/whatsapp/src/runtime.js";
import { runHeartbeatOnce } from "./heartbeat-runner.js";
// Avoid pulling optional runtime deps during isolated runs.
vi.mock("jiti", () => ({ createJiti: () => () => ({}) }));
beforeEach(() => {
const runtime = createPluginRuntime();
setSlackRuntime(runtime);
setTelegramRuntime(runtime);
setWhatsAppRuntime(runtime);
setActivePluginRegistry(
createTestRegistry([
{ pluginId: "slack", plugin: slackPlugin, source: "test" },
{ pluginId: "whatsapp", plugin: whatsappPlugin, source: "test" },
{ pluginId: "telegram", plugin: telegramPlugin, source: "test" },
]),
);
});
describe("runHeartbeatOnce", () => {
it("uses the delivery target as sender when lastTo differs", async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-"));
const storePath = path.join(tmpDir, "sessions.json");
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
try {
const cfg: ClawdbotConfig = {
agents: {
defaults: {
heartbeat: {
every: "5m",
target: "slack",
to: "C0A9P2N8QHY",
},
},
},
session: { store: storePath },
};
const sessionKey = resolveMainSessionKey(cfg);
await fs.writeFile(
storePath,
JSON.stringify(
{
[sessionKey]: {
sessionId: "sid",
updatedAt: Date.now(),
lastChannel: "telegram",
lastProvider: "telegram",
lastTo: "1644620762",
},
},
null,
2,
),
);
replySpy.mockImplementation(async (ctx) => {
expect(ctx.To).toBe("C0A9P2N8QHY");
expect(ctx.From).toBe("C0A9P2N8QHY");
return { text: "ok" };
});
const sendSlack = vi.fn().mockResolvedValue({
messageId: "m1",
channelId: "C0A9P2N8QHY",
});
await runHeartbeatOnce({
cfg,
deps: {
sendSlack,
getQueueSize: () => 0,
nowMs: () => 0,
},
});
expect(sendSlack).toHaveBeenCalled();
} finally {
replySpy.mockRestore();
await fs.rm(tmpDir, { recursive: true, force: true });
}
});
});

View File

@@ -39,7 +39,10 @@ import {
} from "./heartbeat-wake.js";
import type { OutboundSendDeps } from "./outbound/deliver.js";
import { deliverOutboundPayloads } from "./outbound/deliver.js";
import { resolveHeartbeatDeliveryTarget } from "./outbound/targets.js";
import {
resolveHeartbeatDeliveryTarget,
resolveHeartbeatSenderContext,
} from "./outbound/targets.js";
type HeartbeatDeps = OutboundSendDeps &
ChannelHeartbeatDeps & {
@@ -362,34 +365,6 @@ function resolveHeartbeatReasoningPayloads(
});
}
function resolveHeartbeatSender(params: {
allowFrom: Array<string | number>;
lastTo?: string;
provider?: string | null;
}) {
const { allowFrom, lastTo, provider } = params;
const candidates = [
lastTo?.trim(),
provider && lastTo ? `${provider}:${lastTo}` : undefined,
].filter((val): val is string => Boolean(val?.trim()));
const allowList = allowFrom
.map((entry) => String(entry))
.filter((entry) => entry && entry !== "*");
if (allowFrom.includes("*")) {
return candidates[0] ?? "heartbeat";
}
if (candidates.length > 0 && allowList.length > 0) {
const matched = candidates.find((candidate) => allowList.includes(candidate));
if (matched) return matched;
}
if (candidates.length > 0 && allowList.length === 0) {
return candidates[0];
}
if (allowList.length > 0) return allowList[0];
return candidates[0] ?? "heartbeat";
}
async function restoreHeartbeatUpdatedAt(params: {
storePath: string;
sessionKey: string;
@@ -468,20 +443,7 @@ export async function runHeartbeatOnce(opts: {
const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId, heartbeat);
const previousUpdatedAt = entry?.updatedAt;
const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat });
const lastChannel = delivery.lastChannel;
const lastAccountId = delivery.lastAccountId;
const senderProvider = delivery.channel !== "none" ? delivery.channel : lastChannel;
const senderAllowFrom = senderProvider
? (getChannelPlugin(senderProvider)?.config.resolveAllowFrom?.({
cfg,
accountId: senderProvider === lastChannel ? lastAccountId : undefined,
}) ?? [])
: [];
const sender = resolveHeartbeatSender({
allowFrom: senderAllowFrom,
lastTo: entry?.lastTo,
provider: senderProvider,
});
const { sender } = resolveHeartbeatSenderContext({ cfg, entry, delivery });
const prompt = resolveHeartbeatPrompt(cfg, heartbeat);
const ctx = {
Body: prompt,

View File

@@ -29,6 +29,12 @@ export type OutboundTarget = {
lastAccountId?: string;
};
export type HeartbeatSenderContext = {
sender: string;
provider?: DeliverableMessageChannel;
allowFrom: string[];
};
export type OutboundTargetResolution = { ok: true; to: string } | { ok: false; error: Error };
export type SessionDeliveryTarget = {
@@ -250,3 +256,59 @@ export function resolveHeartbeatDeliveryTarget(params: {
lastAccountId: resolvedTarget.lastAccountId,
};
}
function resolveHeartbeatSenderId(params: {
allowFrom: Array<string | number>;
deliveryTo?: string;
lastTo?: string;
provider?: string | null;
}) {
const { allowFrom, deliveryTo, lastTo, provider } = params;
const candidates = [
deliveryTo?.trim(),
provider && deliveryTo ? `${provider}:${deliveryTo}` : undefined,
lastTo?.trim(),
provider && lastTo ? `${provider}:${lastTo}` : undefined,
].filter((val): val is string => Boolean(val?.trim()));
const allowList = allowFrom
.map((entry) => String(entry))
.filter((entry) => entry && entry !== "*");
if (allowFrom.includes("*")) {
return candidates[0] ?? "heartbeat";
}
if (candidates.length > 0 && allowList.length > 0) {
const matched = candidates.find((candidate) => allowList.includes(candidate));
if (matched) return matched;
}
if (candidates.length > 0 && allowList.length === 0) {
return candidates[0];
}
if (allowList.length > 0) return allowList[0];
return candidates[0] ?? "heartbeat";
}
export function resolveHeartbeatSenderContext(params: {
cfg: ClawdbotConfig;
entry?: SessionEntry;
delivery: OutboundTarget;
}): HeartbeatSenderContext {
const provider =
params.delivery.channel !== "none" ? params.delivery.channel : params.delivery.lastChannel;
const allowFrom = provider
? (getChannelPlugin(provider)?.config.resolveAllowFrom?.({
cfg: params.cfg,
accountId:
provider === params.delivery.lastChannel ? params.delivery.lastAccountId : undefined,
}) ?? [])
: [];
const sender = resolveHeartbeatSenderId({
allowFrom,
deliveryTo: params.delivery.to,
lastTo: params.entry?.lastTo,
provider,
});
return { sender, provider, allowFrom };
}

View File

@@ -0,0 +1,178 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js";
import { monitorSlackProvider } from "./monitor.js";
const sendMock = vi.fn();
const replyMock = vi.fn();
const updateLastRouteMock = vi.fn();
const reactMock = vi.fn();
let config: Record<string, unknown> = {};
const readAllowFromStoreMock = vi.fn();
const upsertPairingRequestMock = vi.fn();
const getSlackHandlers = () =>
(
globalThis as {
__slackHandlers?: Map<string, (args: unknown) => Promise<void>>;
}
).__slackHandlers;
const getSlackClient = () =>
(globalThis as { __slackClient?: Record<string, unknown> }).__slackClient;
vi.mock("../config/config.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/config.js")>();
return {
...actual,
loadConfig: () => config,
};
});
vi.mock("../auto-reply/reply.js", () => ({
getReplyFromConfig: (...args: unknown[]) => replyMock(...args),
}));
vi.mock("./resolve-channels.js", () => ({
resolveSlackChannelAllowlist: async ({ entries }: { entries: string[] }) =>
entries.map((input) => ({ input, resolved: false })),
}));
vi.mock("./resolve-users.js", () => ({
resolveSlackUserAllowlist: async ({ entries }: { entries: string[] }) =>
entries.map((input) => ({ input, resolved: false })),
}));
vi.mock("./send.js", () => ({
sendMessageSlack: (...args: unknown[]) => sendMock(...args),
}));
vi.mock("../pairing/pairing-store.js", () => ({
readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args),
upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args),
}));
vi.mock("../config/sessions.js", () => ({
resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"),
updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args),
resolveSessionKey: vi.fn(),
readSessionUpdatedAt: vi.fn(() => undefined),
recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("@slack/bolt", () => {
const handlers = new Map<string, (args: unknown) => Promise<void>>();
(globalThis as { __slackHandlers?: typeof handlers }).__slackHandlers = handlers;
const client = {
auth: { test: vi.fn().mockResolvedValue({ user_id: "bot-user" }) },
conversations: {
info: vi.fn().mockResolvedValue({
channel: { name: "general", is_channel: true },
}),
replies: vi.fn().mockResolvedValue({ messages: [] }),
history: vi.fn().mockResolvedValue({ messages: [] }),
},
users: {
info: vi.fn().mockResolvedValue({
user: { profile: { display_name: "Ada" } },
}),
},
assistant: {
threads: {
setStatus: vi.fn().mockResolvedValue({ ok: true }),
},
},
reactions: {
add: (...args: unknown[]) => reactMock(...args),
},
};
(globalThis as { __slackClient?: typeof client }).__slackClient = client;
class App {
client = client;
event(name: string, handler: (args: unknown) => Promise<void>) {
handlers.set(name, handler);
}
command() {
/* no-op */
}
start = vi.fn().mockResolvedValue(undefined);
stop = vi.fn().mockResolvedValue(undefined);
}
class HTTPReceiver {
requestListener = vi.fn();
}
return { App, HTTPReceiver, default: { App, HTTPReceiver } };
});
const flush = () => new Promise((resolve) => setTimeout(resolve, 0));
async function waitForEvent(name: string) {
for (let i = 0; i < 10; i += 1) {
if (getSlackHandlers()?.has(name)) return;
await flush();
}
}
beforeEach(() => {
resetInboundDedupe();
getSlackHandlers()?.clear();
config = {
messages: { responsePrefix: "PFX" },
channels: {
slack: {
dm: { enabled: true, policy: "open", allowFrom: ["*"] },
groupPolicy: "open",
channels: { C1: { allow: true, requireMention: false } },
},
},
};
sendMock.mockReset().mockResolvedValue(undefined);
replyMock.mockReset();
updateLastRouteMock.mockReset();
reactMock.mockReset();
readAllowFromStoreMock.mockReset().mockResolvedValue([]);
upsertPairingRequestMock.mockReset().mockResolvedValue({ code: "PAIRCODE", created: true });
});
describe("monitorSlackProvider threading", () => {
it("recovers missing thread_ts when parent_user_id is present", async () => {
replyMock.mockResolvedValue({ text: "thread reply" });
const client = getSlackClient();
if (!client) throw new Error("Slack client not registered");
const conversations = client.conversations as {
history: ReturnType<typeof vi.fn>;
};
conversations.history.mockResolvedValueOnce({
messages: [{ ts: "456", thread_ts: "111.222" }],
});
const controller = new AbortController();
const run = monitorSlackProvider({
botToken: "bot-token",
appToken: "app-token",
abortSignal: controller.signal,
});
await waitForEvent("message");
const handler = getSlackHandlers()?.get("message");
if (!handler) throw new Error("Slack message handler not registered");
await handler({
event: {
type: "message",
user: "U1",
text: "hello",
ts: "456",
parent_user_id: "U2",
channel: "C1",
channel_type: "channel",
},
});
await flush();
controller.abort();
await run;
expect(sendMock).toHaveBeenCalledTimes(1);
expect(sendMock.mock.calls[0][2]).toMatchObject({ threadTs: "111.222" });
});
});

View File

@@ -8,6 +8,7 @@ import type { SlackMessageEvent } from "../types.js";
import type { SlackMonitorContext } from "./context.js";
import { dispatchPreparedSlackMessage } from "./message-handler/dispatch.js";
import { prepareSlackMessage } from "./message-handler/prepare.js";
import { createSlackThreadTsResolver } from "./thread-resolution.js";
export type SlackMessageHandler = (
message: SlackMessageEvent,
@@ -20,6 +21,7 @@ export function createSlackMessageHandler(params: {
}): SlackMessageHandler {
const { ctx, account } = params;
const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" });
const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client });
const debouncer = createInboundDebouncer<{
message: SlackMessageEvent;
@@ -29,9 +31,13 @@ export function createSlackMessageHandler(params: {
buildKey: (entry) => {
const senderId = entry.message.user ?? entry.message.bot_id;
if (!senderId) return null;
const messageTs = entry.message.ts ?? entry.message.event_ts;
// If Slack flags a thread reply but omits thread_ts, isolate it from root debouncing.
const threadKey = entry.message.thread_ts
? `${entry.message.channel}:${entry.message.thread_ts}`
: entry.message.channel;
: entry.message.parent_user_id && messageTs
? `${entry.message.channel}:maybe-thread:${messageTs}`
: entry.message.channel;
return `slack:${ctx.accountId}:${threadKey}:${senderId}`;
},
shouldDebounce: (entry) => {
@@ -91,6 +97,7 @@ export function createSlackMessageHandler(params: {
return;
}
if (ctx.markMessageSeen(message.channel, message.ts)) return;
await debouncer.enqueue({ message, opts });
const resolvedMessage = await threadTsResolver.resolve({ message, source: opts.source });
await debouncer.enqueue({ message: resolvedMessage, opts });
};
}

View File

@@ -0,0 +1,30 @@
import { describe, expect, it, vi } from "vitest";
import type { SlackMessageEvent } from "../types.js";
import { createSlackThreadTsResolver } from "./thread-resolution.js";
describe("createSlackThreadTsResolver", () => {
it("caches resolved thread_ts lookups", async () => {
const historyMock = vi.fn().mockResolvedValue({
messages: [{ ts: "1", thread_ts: "9" }],
});
const resolver = createSlackThreadTsResolver({
client: { conversations: { history: historyMock } } as any,
cacheTtlMs: 60_000,
maxSize: 5,
});
const message = {
channel: "C1",
parent_user_id: "U2",
ts: "1",
} as SlackMessageEvent;
const first = await resolver.resolve({ message, source: "message" });
const second = await resolver.resolve({ message, source: "message" });
expect(first.thread_ts).toBe("9");
expect(second.thread_ts).toBe("9");
expect(historyMock).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,140 @@
import type { WebClient as SlackWebClient } from "@slack/web-api";
import { logVerbose, shouldLogVerbose } from "../../globals.js";
import type { SlackMessageEvent } from "../types.js";
type ThreadTsCacheEntry = {
threadTs: string | null;
updatedAt: number;
};
const DEFAULT_THREAD_TS_CACHE_TTL_MS = 60_000;
const DEFAULT_THREAD_TS_CACHE_MAX = 500;
const normalizeThreadTs = (threadTs?: string | null) => {
const trimmed = threadTs?.trim();
return trimmed ? trimmed : undefined;
};
async function resolveThreadTsFromHistory(params: {
client: SlackWebClient;
channelId: string;
messageTs: string;
}) {
try {
const response = (await params.client.conversations.history({
channel: params.channelId,
latest: params.messageTs,
oldest: params.messageTs,
inclusive: true,
limit: 1,
})) as { messages?: Array<{ ts?: string; thread_ts?: string }> };
const message =
response.messages?.find((entry) => entry.ts === params.messageTs) ?? response.messages?.[0];
return normalizeThreadTs(message?.thread_ts);
} catch (err) {
if (shouldLogVerbose()) {
logVerbose(
`slack inbound: failed to resolve thread_ts via conversations.history for channel=${params.channelId} ts=${params.messageTs}: ${String(err)}`,
);
}
return undefined;
}
}
export function createSlackThreadTsResolver(params: {
client: SlackWebClient;
cacheTtlMs?: number;
maxSize?: number;
}) {
const ttlMs = Math.max(0, params.cacheTtlMs ?? DEFAULT_THREAD_TS_CACHE_TTL_MS);
const maxSize = Math.max(0, params.maxSize ?? DEFAULT_THREAD_TS_CACHE_MAX);
const cache = new Map<string, ThreadTsCacheEntry>();
const inflight = new Map<string, Promise<string | undefined>>();
const getCached = (key: string, now: number) => {
const entry = cache.get(key);
if (!entry) return undefined;
if (ttlMs > 0 && now - entry.updatedAt > ttlMs) {
cache.delete(key);
return undefined;
}
cache.delete(key);
cache.set(key, { ...entry, updatedAt: now });
return entry.threadTs;
};
const setCached = (key: string, threadTs: string | null, now: number) => {
cache.delete(key);
cache.set(key, { threadTs, updatedAt: now });
if (maxSize <= 0) {
cache.clear();
return;
}
while (cache.size > maxSize) {
const oldestKey = cache.keys().next().value as string | undefined;
if (!oldestKey) break;
cache.delete(oldestKey);
}
};
return {
resolve: async (request: {
message: SlackMessageEvent;
source: "message" | "app_mention";
}): Promise<SlackMessageEvent> => {
const { message } = request;
if (!message.parent_user_id || message.thread_ts || !message.ts) {
return message;
}
const cacheKey = `${message.channel}:${message.ts}`;
const now = Date.now();
const cached = getCached(cacheKey, now);
if (cached !== undefined) {
return cached ? { ...message, thread_ts: cached } : message;
}
if (shouldLogVerbose()) {
logVerbose(
`slack inbound: missing thread_ts for thread reply channel=${message.channel} ts=${message.ts} source=${request.source}`,
);
}
let pending = inflight.get(cacheKey);
if (!pending) {
pending = resolveThreadTsFromHistory({
client: params.client,
channelId: message.channel,
messageTs: message.ts,
});
inflight.set(cacheKey, pending);
}
let resolved: string | undefined;
try {
resolved = await pending;
} finally {
inflight.delete(cacheKey);
}
setCached(cacheKey, resolved ?? null, Date.now());
if (resolved) {
if (shouldLogVerbose()) {
logVerbose(
`slack inbound: resolved missing thread_ts channel=${message.channel} ts=${message.ts} -> thread_ts=${resolved}`,
);
}
return { ...message, thread_ts: resolved };
}
if (shouldLogVerbose()) {
logVerbose(
`slack inbound: could not resolve missing thread_ts channel=${message.channel} ts=${message.ts}`,
);
}
return message;
},
};
}