From 4355d9acca96ec1fc0a0ee5f19b78c1b386ccaf0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 23 Jan 2026 02:05:34 +0000 Subject: [PATCH] fix: resolve heartbeat sender and Slack thread_ts --- ...ner.sender-prefers-delivery-target.test.ts | 100 ++++++++++ src/infra/heartbeat-runner.ts | 48 +---- src/infra/outbound/targets.ts | 62 ++++++ ...onitor.threading.missing-thread-ts.test.ts | 178 ++++++++++++++++++ src/slack/monitor/message-handler.ts | 11 +- src/slack/monitor/thread-resolution.test.ts | 30 +++ src/slack/monitor/thread-resolution.ts | 140 ++++++++++++++ 7 files changed, 524 insertions(+), 45 deletions(-) create mode 100644 src/infra/heartbeat-runner.sender-prefers-delivery-target.test.ts create mode 100644 src/slack/monitor.threading.missing-thread-ts.test.ts create mode 100644 src/slack/monitor/thread-resolution.test.ts create mode 100644 src/slack/monitor/thread-resolution.ts diff --git a/src/infra/heartbeat-runner.sender-prefers-delivery-target.test.ts b/src/infra/heartbeat-runner.sender-prefers-delivery-target.test.ts new file mode 100644 index 000000000..fd4a167e0 --- /dev/null +++ b/src/infra/heartbeat-runner.sender-prefers-delivery-target.test.ts @@ -0,0 +1,100 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import * as replyModule from "../auto-reply/reply.js"; +import type { ClawdbotConfig } from "../config/config.js"; +import { resolveMainSessionKey } from "../config/sessions.js"; +import { setActivePluginRegistry } from "../plugins/runtime.js"; +import { createPluginRuntime } from "../plugins/runtime/index.js"; +import { createTestRegistry } from "../test-utils/channel-plugins.js"; +import { slackPlugin } from "../../extensions/slack/src/channel.js"; +import { setSlackRuntime } from "../../extensions/slack/src/runtime.js"; +import { telegramPlugin } from "../../extensions/telegram/src/channel.js"; +import { setTelegramRuntime } from "../../extensions/telegram/src/runtime.js"; +import { whatsappPlugin } from "../../extensions/whatsapp/src/channel.js"; +import { setWhatsAppRuntime } from "../../extensions/whatsapp/src/runtime.js"; +import { runHeartbeatOnce } from "./heartbeat-runner.js"; + +// Avoid pulling optional runtime deps during isolated runs. +vi.mock("jiti", () => ({ createJiti: () => () => ({}) })); + +beforeEach(() => { + const runtime = createPluginRuntime(); + setSlackRuntime(runtime); + setTelegramRuntime(runtime); + setWhatsAppRuntime(runtime); + setActivePluginRegistry( + createTestRegistry([ + { pluginId: "slack", plugin: slackPlugin, source: "test" }, + { pluginId: "whatsapp", plugin: whatsappPlugin, source: "test" }, + { pluginId: "telegram", plugin: telegramPlugin, source: "test" }, + ]), + ); +}); + +describe("runHeartbeatOnce", () => { + it("uses the delivery target as sender when lastTo differs", async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-hb-")); + const storePath = path.join(tmpDir, "sessions.json"); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + try { + const cfg: ClawdbotConfig = { + agents: { + defaults: { + heartbeat: { + every: "5m", + target: "slack", + to: "C0A9P2N8QHY", + }, + }, + }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "1644620762", + }, + }, + null, + 2, + ), + ); + + replySpy.mockImplementation(async (ctx) => { + expect(ctx.To).toBe("C0A9P2N8QHY"); + expect(ctx.From).toBe("C0A9P2N8QHY"); + return { text: "ok" }; + }); + + const sendSlack = vi.fn().mockResolvedValue({ + messageId: "m1", + channelId: "C0A9P2N8QHY", + }); + + await runHeartbeatOnce({ + cfg, + deps: { + sendSlack, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(sendSlack).toHaveBeenCalled(); + } finally { + replySpy.mockRestore(); + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); +}); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index c39ac6923..a371b4dbb 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -39,7 +39,10 @@ import { } from "./heartbeat-wake.js"; import type { OutboundSendDeps } from "./outbound/deliver.js"; import { deliverOutboundPayloads } from "./outbound/deliver.js"; -import { resolveHeartbeatDeliveryTarget } from "./outbound/targets.js"; +import { + resolveHeartbeatDeliveryTarget, + resolveHeartbeatSenderContext, +} from "./outbound/targets.js"; type HeartbeatDeps = OutboundSendDeps & ChannelHeartbeatDeps & { @@ -362,34 +365,6 @@ function resolveHeartbeatReasoningPayloads( }); } -function resolveHeartbeatSender(params: { - allowFrom: Array; - lastTo?: string; - provider?: string | null; -}) { - const { allowFrom, lastTo, provider } = params; - const candidates = [ - lastTo?.trim(), - provider && lastTo ? `${provider}:${lastTo}` : undefined, - ].filter((val): val is string => Boolean(val?.trim())); - - const allowList = allowFrom - .map((entry) => String(entry)) - .filter((entry) => entry && entry !== "*"); - if (allowFrom.includes("*")) { - return candidates[0] ?? "heartbeat"; - } - if (candidates.length > 0 && allowList.length > 0) { - const matched = candidates.find((candidate) => allowList.includes(candidate)); - if (matched) return matched; - } - if (candidates.length > 0 && allowList.length === 0) { - return candidates[0]; - } - if (allowList.length > 0) return allowList[0]; - return candidates[0] ?? "heartbeat"; -} - async function restoreHeartbeatUpdatedAt(params: { storePath: string; sessionKey: string; @@ -468,20 +443,7 @@ export async function runHeartbeatOnce(opts: { const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId, heartbeat); const previousUpdatedAt = entry?.updatedAt; const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat }); - const lastChannel = delivery.lastChannel; - const lastAccountId = delivery.lastAccountId; - const senderProvider = delivery.channel !== "none" ? delivery.channel : lastChannel; - const senderAllowFrom = senderProvider - ? (getChannelPlugin(senderProvider)?.config.resolveAllowFrom?.({ - cfg, - accountId: senderProvider === lastChannel ? lastAccountId : undefined, - }) ?? []) - : []; - const sender = resolveHeartbeatSender({ - allowFrom: senderAllowFrom, - lastTo: entry?.lastTo, - provider: senderProvider, - }); + const { sender } = resolveHeartbeatSenderContext({ cfg, entry, delivery }); const prompt = resolveHeartbeatPrompt(cfg, heartbeat); const ctx = { Body: prompt, diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index d37d8c69f..f684c4bb2 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -29,6 +29,12 @@ export type OutboundTarget = { lastAccountId?: string; }; +export type HeartbeatSenderContext = { + sender: string; + provider?: DeliverableMessageChannel; + allowFrom: string[]; +}; + export type OutboundTargetResolution = { ok: true; to: string } | { ok: false; error: Error }; export type SessionDeliveryTarget = { @@ -250,3 +256,59 @@ export function resolveHeartbeatDeliveryTarget(params: { lastAccountId: resolvedTarget.lastAccountId, }; } + +function resolveHeartbeatSenderId(params: { + allowFrom: Array; + deliveryTo?: string; + lastTo?: string; + provider?: string | null; +}) { + const { allowFrom, deliveryTo, lastTo, provider } = params; + const candidates = [ + deliveryTo?.trim(), + provider && deliveryTo ? `${provider}:${deliveryTo}` : undefined, + lastTo?.trim(), + provider && lastTo ? `${provider}:${lastTo}` : undefined, + ].filter((val): val is string => Boolean(val?.trim())); + + const allowList = allowFrom + .map((entry) => String(entry)) + .filter((entry) => entry && entry !== "*"); + if (allowFrom.includes("*")) { + return candidates[0] ?? "heartbeat"; + } + if (candidates.length > 0 && allowList.length > 0) { + const matched = candidates.find((candidate) => allowList.includes(candidate)); + if (matched) return matched; + } + if (candidates.length > 0 && allowList.length === 0) { + return candidates[0]; + } + if (allowList.length > 0) return allowList[0]; + return candidates[0] ?? "heartbeat"; +} + +export function resolveHeartbeatSenderContext(params: { + cfg: ClawdbotConfig; + entry?: SessionEntry; + delivery: OutboundTarget; +}): HeartbeatSenderContext { + const provider = + params.delivery.channel !== "none" ? params.delivery.channel : params.delivery.lastChannel; + const allowFrom = provider + ? (getChannelPlugin(provider)?.config.resolveAllowFrom?.({ + cfg: params.cfg, + accountId: + provider === params.delivery.lastChannel ? params.delivery.lastAccountId : undefined, + }) ?? []) + : []; + + const sender = resolveHeartbeatSenderId({ + allowFrom, + deliveryTo: params.delivery.to, + lastTo: params.entry?.lastTo, + provider, + }); + + return { sender, provider, allowFrom }; +} diff --git a/src/slack/monitor.threading.missing-thread-ts.test.ts b/src/slack/monitor.threading.missing-thread-ts.test.ts new file mode 100644 index 000000000..743671fd8 --- /dev/null +++ b/src/slack/monitor.threading.missing-thread-ts.test.ts @@ -0,0 +1,178 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js"; +import { monitorSlackProvider } from "./monitor.js"; + +const sendMock = vi.fn(); +const replyMock = vi.fn(); +const updateLastRouteMock = vi.fn(); +const reactMock = vi.fn(); +let config: Record = {}; +const readAllowFromStoreMock = vi.fn(); +const upsertPairingRequestMock = vi.fn(); +const getSlackHandlers = () => + ( + globalThis as { + __slackHandlers?: Map Promise>; + } + ).__slackHandlers; +const getSlackClient = () => + (globalThis as { __slackClient?: Record }).__slackClient; + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: () => config, + }; +}); + +vi.mock("../auto-reply/reply.js", () => ({ + getReplyFromConfig: (...args: unknown[]) => replyMock(...args), +})); + +vi.mock("./resolve-channels.js", () => ({ + resolveSlackChannelAllowlist: async ({ entries }: { entries: string[] }) => + entries.map((input) => ({ input, resolved: false })), +})); + +vi.mock("./resolve-users.js", () => ({ + resolveSlackUserAllowlist: async ({ entries }: { entries: string[] }) => + entries.map((input) => ({ input, resolved: false })), +})); + +vi.mock("./send.js", () => ({ + sendMessageSlack: (...args: unknown[]) => sendMock(...args), +})); + +vi.mock("../pairing/pairing-store.js", () => ({ + readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args), + upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args), +})); + +vi.mock("../config/sessions.js", () => ({ + resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), + updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), + resolveSessionKey: vi.fn(), + readSessionUpdatedAt: vi.fn(() => undefined), + recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("@slack/bolt", () => { + const handlers = new Map Promise>(); + (globalThis as { __slackHandlers?: typeof handlers }).__slackHandlers = handlers; + const client = { + auth: { test: vi.fn().mockResolvedValue({ user_id: "bot-user" }) }, + conversations: { + info: vi.fn().mockResolvedValue({ + channel: { name: "general", is_channel: true }, + }), + replies: vi.fn().mockResolvedValue({ messages: [] }), + history: vi.fn().mockResolvedValue({ messages: [] }), + }, + users: { + info: vi.fn().mockResolvedValue({ + user: { profile: { display_name: "Ada" } }, + }), + }, + assistant: { + threads: { + setStatus: vi.fn().mockResolvedValue({ ok: true }), + }, + }, + reactions: { + add: (...args: unknown[]) => reactMock(...args), + }, + }; + (globalThis as { __slackClient?: typeof client }).__slackClient = client; + class App { + client = client; + event(name: string, handler: (args: unknown) => Promise) { + handlers.set(name, handler); + } + command() { + /* no-op */ + } + start = vi.fn().mockResolvedValue(undefined); + stop = vi.fn().mockResolvedValue(undefined); + } + class HTTPReceiver { + requestListener = vi.fn(); + } + return { App, HTTPReceiver, default: { App, HTTPReceiver } }; +}); + +const flush = () => new Promise((resolve) => setTimeout(resolve, 0)); + +async function waitForEvent(name: string) { + for (let i = 0; i < 10; i += 1) { + if (getSlackHandlers()?.has(name)) return; + await flush(); + } +} + +beforeEach(() => { + resetInboundDedupe(); + getSlackHandlers()?.clear(); + config = { + messages: { responsePrefix: "PFX" }, + channels: { + slack: { + dm: { enabled: true, policy: "open", allowFrom: ["*"] }, + groupPolicy: "open", + channels: { C1: { allow: true, requireMention: false } }, + }, + }, + }; + sendMock.mockReset().mockResolvedValue(undefined); + replyMock.mockReset(); + updateLastRouteMock.mockReset(); + reactMock.mockReset(); + readAllowFromStoreMock.mockReset().mockResolvedValue([]); + upsertPairingRequestMock.mockReset().mockResolvedValue({ code: "PAIRCODE", created: true }); +}); + +describe("monitorSlackProvider threading", () => { + it("recovers missing thread_ts when parent_user_id is present", async () => { + replyMock.mockResolvedValue({ text: "thread reply" }); + + const client = getSlackClient(); + if (!client) throw new Error("Slack client not registered"); + const conversations = client.conversations as { + history: ReturnType; + }; + conversations.history.mockResolvedValueOnce({ + messages: [{ ts: "456", thread_ts: "111.222" }], + }); + + const controller = new AbortController(); + const run = monitorSlackProvider({ + botToken: "bot-token", + appToken: "app-token", + abortSignal: controller.signal, + }); + + await waitForEvent("message"); + const handler = getSlackHandlers()?.get("message"); + if (!handler) throw new Error("Slack message handler not registered"); + + await handler({ + event: { + type: "message", + user: "U1", + text: "hello", + ts: "456", + parent_user_id: "U2", + channel: "C1", + channel_type: "channel", + }, + }); + + await flush(); + controller.abort(); + await run; + + expect(sendMock).toHaveBeenCalledTimes(1); + expect(sendMock.mock.calls[0][2]).toMatchObject({ threadTs: "111.222" }); + }); +}); diff --git a/src/slack/monitor/message-handler.ts b/src/slack/monitor/message-handler.ts index 2926d6ce2..1ee736496 100644 --- a/src/slack/monitor/message-handler.ts +++ b/src/slack/monitor/message-handler.ts @@ -8,6 +8,7 @@ import type { SlackMessageEvent } from "../types.js"; import type { SlackMonitorContext } from "./context.js"; import { dispatchPreparedSlackMessage } from "./message-handler/dispatch.js"; import { prepareSlackMessage } from "./message-handler/prepare.js"; +import { createSlackThreadTsResolver } from "./thread-resolution.js"; export type SlackMessageHandler = ( message: SlackMessageEvent, @@ -20,6 +21,7 @@ export function createSlackMessageHandler(params: { }): SlackMessageHandler { const { ctx, account } = params; const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" }); + const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client }); const debouncer = createInboundDebouncer<{ message: SlackMessageEvent; @@ -29,9 +31,13 @@ export function createSlackMessageHandler(params: { buildKey: (entry) => { const senderId = entry.message.user ?? entry.message.bot_id; if (!senderId) return null; + const messageTs = entry.message.ts ?? entry.message.event_ts; + // If Slack flags a thread reply but omits thread_ts, isolate it from root debouncing. const threadKey = entry.message.thread_ts ? `${entry.message.channel}:${entry.message.thread_ts}` - : entry.message.channel; + : entry.message.parent_user_id && messageTs + ? `${entry.message.channel}:maybe-thread:${messageTs}` + : entry.message.channel; return `slack:${ctx.accountId}:${threadKey}:${senderId}`; }, shouldDebounce: (entry) => { @@ -91,6 +97,7 @@ export function createSlackMessageHandler(params: { return; } if (ctx.markMessageSeen(message.channel, message.ts)) return; - await debouncer.enqueue({ message, opts }); + const resolvedMessage = await threadTsResolver.resolve({ message, source: opts.source }); + await debouncer.enqueue({ message: resolvedMessage, opts }); }; } diff --git a/src/slack/monitor/thread-resolution.test.ts b/src/slack/monitor/thread-resolution.test.ts new file mode 100644 index 000000000..e670f1ee6 --- /dev/null +++ b/src/slack/monitor/thread-resolution.test.ts @@ -0,0 +1,30 @@ +import { describe, expect, it, vi } from "vitest"; + +import type { SlackMessageEvent } from "../types.js"; +import { createSlackThreadTsResolver } from "./thread-resolution.js"; + +describe("createSlackThreadTsResolver", () => { + it("caches resolved thread_ts lookups", async () => { + const historyMock = vi.fn().mockResolvedValue({ + messages: [{ ts: "1", thread_ts: "9" }], + }); + const resolver = createSlackThreadTsResolver({ + client: { conversations: { history: historyMock } } as any, + cacheTtlMs: 60_000, + maxSize: 5, + }); + + const message = { + channel: "C1", + parent_user_id: "U2", + ts: "1", + } as SlackMessageEvent; + + const first = await resolver.resolve({ message, source: "message" }); + const second = await resolver.resolve({ message, source: "message" }); + + expect(first.thread_ts).toBe("9"); + expect(second.thread_ts).toBe("9"); + expect(historyMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/slack/monitor/thread-resolution.ts b/src/slack/monitor/thread-resolution.ts new file mode 100644 index 000000000..3a9306144 --- /dev/null +++ b/src/slack/monitor/thread-resolution.ts @@ -0,0 +1,140 @@ +import type { WebClient as SlackWebClient } from "@slack/web-api"; + +import { logVerbose, shouldLogVerbose } from "../../globals.js"; +import type { SlackMessageEvent } from "../types.js"; + +type ThreadTsCacheEntry = { + threadTs: string | null; + updatedAt: number; +}; + +const DEFAULT_THREAD_TS_CACHE_TTL_MS = 60_000; +const DEFAULT_THREAD_TS_CACHE_MAX = 500; + +const normalizeThreadTs = (threadTs?: string | null) => { + const trimmed = threadTs?.trim(); + return trimmed ? trimmed : undefined; +}; + +async function resolveThreadTsFromHistory(params: { + client: SlackWebClient; + channelId: string; + messageTs: string; +}) { + try { + const response = (await params.client.conversations.history({ + channel: params.channelId, + latest: params.messageTs, + oldest: params.messageTs, + inclusive: true, + limit: 1, + })) as { messages?: Array<{ ts?: string; thread_ts?: string }> }; + const message = + response.messages?.find((entry) => entry.ts === params.messageTs) ?? response.messages?.[0]; + return normalizeThreadTs(message?.thread_ts); + } catch (err) { + if (shouldLogVerbose()) { + logVerbose( + `slack inbound: failed to resolve thread_ts via conversations.history for channel=${params.channelId} ts=${params.messageTs}: ${String(err)}`, + ); + } + return undefined; + } +} + +export function createSlackThreadTsResolver(params: { + client: SlackWebClient; + cacheTtlMs?: number; + maxSize?: number; +}) { + const ttlMs = Math.max(0, params.cacheTtlMs ?? DEFAULT_THREAD_TS_CACHE_TTL_MS); + const maxSize = Math.max(0, params.maxSize ?? DEFAULT_THREAD_TS_CACHE_MAX); + const cache = new Map(); + const inflight = new Map>(); + + const getCached = (key: string, now: number) => { + const entry = cache.get(key); + if (!entry) return undefined; + if (ttlMs > 0 && now - entry.updatedAt > ttlMs) { + cache.delete(key); + return undefined; + } + cache.delete(key); + cache.set(key, { ...entry, updatedAt: now }); + return entry.threadTs; + }; + + const setCached = (key: string, threadTs: string | null, now: number) => { + cache.delete(key); + cache.set(key, { threadTs, updatedAt: now }); + if (maxSize <= 0) { + cache.clear(); + return; + } + while (cache.size > maxSize) { + const oldestKey = cache.keys().next().value as string | undefined; + if (!oldestKey) break; + cache.delete(oldestKey); + } + }; + + return { + resolve: async (request: { + message: SlackMessageEvent; + source: "message" | "app_mention"; + }): Promise => { + const { message } = request; + if (!message.parent_user_id || message.thread_ts || !message.ts) { + return message; + } + + const cacheKey = `${message.channel}:${message.ts}`; + const now = Date.now(); + const cached = getCached(cacheKey, now); + if (cached !== undefined) { + return cached ? { ...message, thread_ts: cached } : message; + } + + if (shouldLogVerbose()) { + logVerbose( + `slack inbound: missing thread_ts for thread reply channel=${message.channel} ts=${message.ts} source=${request.source}`, + ); + } + + let pending = inflight.get(cacheKey); + if (!pending) { + pending = resolveThreadTsFromHistory({ + client: params.client, + channelId: message.channel, + messageTs: message.ts, + }); + inflight.set(cacheKey, pending); + } + + let resolved: string | undefined; + try { + resolved = await pending; + } finally { + inflight.delete(cacheKey); + } + + setCached(cacheKey, resolved ?? null, Date.now()); + + if (resolved) { + if (shouldLogVerbose()) { + logVerbose( + `slack inbound: resolved missing thread_ts channel=${message.channel} ts=${message.ts} -> thread_ts=${resolved}`, + ); + } + return { ...message, thread_ts: resolved }; + } + + if (shouldLogVerbose()) { + logVerbose( + `slack inbound: could not resolve missing thread_ts channel=${message.channel} ts=${message.ts}`, + ); + } + return message; + }, + }; +}