From 02ca148583e374cea6f6f7902b82ed2e50d63967 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 20 Jan 2026 17:22:07 +0000 Subject: [PATCH] fix: preserve subagent thread routing (#1241) Thanks @gnarco. Co-authored-by: gnarco --- CHANGELOG.md | 1 + README.md | 40 +++++++++---------- docs/tools/subagents.md | 3 +- src/agents/clawdbot-tools.ts | 6 +++ src/agents/pi-embedded-runner/run.ts | 2 + src/agents/pi-embedded-runner/run/attempt.ts | 2 + src/agents/pi-embedded-runner/run/params.ts | 4 ++ src/agents/pi-embedded-runner/run/types.ts | 2 + src/agents/pi-tools.ts | 4 ++ src/agents/subagent-announce.ts | 8 ++++ src/agents/tools/sessions-spawn-tool.ts | 4 ++ .../reply/agent-runner-execution.ts | 2 + src/auto-reply/reply/agent-runner-memory.ts | 2 + src/auto-reply/reply/followup-runner.ts | 2 + .../reply/get-reply-inline-actions.ts | 2 + src/auto-reply/reply/route-reply.test.ts | 16 ++++++++ src/auto-reply/reply/route-reply.ts | 9 ++++- src/auto-reply/reply/session.ts | 5 +++ src/commands/agent.ts | 2 + src/commands/agent/delivery.ts | 7 ++++ src/commands/agent/run-context.ts | 9 +++++ src/commands/agent/types.ts | 2 + src/config/sessions/store.ts | 13 ++++-- src/config/sessions/types.ts | 1 + src/gateway/protocol/schema/agent.ts | 1 + src/gateway/server-methods/agent.ts | 10 +++++ src/infra/outbound/agent-delivery.ts | 4 ++ src/infra/outbound/targets.test.ts | 8 ++++ src/infra/outbound/targets.ts | 11 +++++ src/slack/monitor/message-handler/prepare.ts | 1 + src/utils/delivery-context.test.ts | 18 +++++++-- src/utils/delivery-context.ts | 26 ++++++++++-- 32 files changed, 195 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70e257831..4e8d299a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Docs: https://docs.clawd.bot - Zalouser: add channel dock metadata, config schema, setup wiring, probe, and status issues. (#1219) — thanks @suminhthanh. ### Fixes - Discovery: shorten Bonjour DNS-SD service type to `_clawdbot-gw._tcp` and update discovery clients/docs. +- Agents: preserve subagent announce thread/topic routing + queued replies across channels. (#1241) — thanks @gnarco. - Doctor: clarify plugin auto-enable hint text in the startup banner. - Gateway: clarify unauthorized handshake responses with token/password mismatch guidance. - Web search: infer Perplexity base URL from API key source (direct vs OpenRouter). diff --git a/README.md b/README.md index bc2dc9b01..28528313a 100644 --- a/README.md +++ b/README.md @@ -482,24 +482,24 @@ Thanks to all clawtributors: steipete bohdanpodvirnyi joaohlisboa mneves75 MatthieuBizien rahthakor vrknetha radek-paclt joshp123 mukhtharcm maxsumrall xadenryan Tobias Bischoff juanpablodlc hsrvc magimetal meaningfool NicholasSpisak abhisekbasu1 sebslight claude jamesgroat Hyaxia dantelex daveonkels mteam88 Eng. Juan Combetto dbhurley Mariano Belinky TSavo - julianengel benithors timolins nachx639 sreekaransrinath gupsammy cristip73 nachoiacovino Vasanth Rao Naik Sabavat cpojer - lc0rp scald gumadeiras andranik-sahakyan davidguttman sleontenko sircrumpet peschee rafaelreis-r thewilloftheshadow - ratulsarna lutr0 bradleypriest danielz1z emanuelst KristijanJovanovski CashWilliams rdev osolmaz joshrad-dev - kiranjd adityashaw2 sheeek artuskg onutc manuelhettich minghinmatthewlam myfunc buddyh connorshea - mcinteerj timkrase zerone0x gerardward2007 obviyus tosh-hamburg azade-c roshanasingh4 bjesuiter cheeeee - Josh Phillips YuriNachos chriseidhof tyler6204 ysqander superman32432432 vignesh07 Yurii Chukhlib grp06 antons - austinm911 blacksmith-sh[bot] dan-dr HeimdallStrategy imfing jalehman jarvis-medmatic kkarimi mahmoudashraf93 petter-b - pkrmf RandyVentures Ryan Lisse erikpr1994 Ghost jonasjancarik Keith the Silly Goose L36 Server Marc mitschabaude-bot - neist ngutman chrisrodz Friederike Seiler gabriel-trigo iamadig Kit koala73 manmal ogulcancelik - pasogott petradonka rubyrunsstuff VACInc wes-davis zats 24601 Chris Taylor Django Navarro evalexpr - henrino3 humanwritten larlyssa mkbehr oswalpalash pcty-nextgen-service-account sibbl Syhids Aaron Konyer aaronveklabs - adam91holt dougvk erik-agens fcatuhe ivanrvpereira jayhickey jeffersonwarrior jeffersonwarrior Jonathan D. Rhyne (DJ-D) jverdi - longmaba mickahouan mjrussell p6l-richard philipp-spiess robaxelsen Sash Catanzarite T5-AndyML VAC zknicker - alejandro maza andrewting19 anpoirier Asleep123 bolismauro cash-echo-bot Clawd conhecendocontato Dimitrios Ploutarchos Drake Thomsen - Felix Krause gtsifrikas HazAT hrdwdmrbl hugobarauna Jamie Openshaw Jarvis Jefferson Nunn Kevin Lin kitze - levifig Lloyd loukotal martinpucik Miles mrdbstn MSch Mustafa Tag Eldeen ndraiman nexty5870 - prathamdby reeltimeapps RLTCmpe rodrigouroz Rolf Fredheim Rony Kelner Samrat Jha siraht snopoke suminhthanh - The Admiral thesash Ubuntu voidserf wstock Zach Knickerbocker Alphonse-arianee Azade carlulsoe ddyo - Erik latitudeki5223 Manuel Maly Mourad Boustani odrobnik pcty-nextgen-ios-builder Quentin Randy Torres rhjoh ronak-guliani - William Stock p6l-richard + julianengel benithors bradleypriest timolins nachx639 sreekaransrinath gupsammy cristip73 nachoiacovino Vasanth Rao Naik Sabavat + cpojer lc0rp scald gumadeiras andranik-sahakyan davidguttman sleontenko sircrumpet peschee rafaelreis-r + thewilloftheshadow ratulsarna lutr0 danielz1z emanuelst KristijanJovanovski CashWilliams rdev osolmaz joshrad-dev + kiranjd adityashaw2 sheeek artuskg onutc tyler6204 manuelhettich minghinmatthewlam myfunc buddyh + connorshea mcinteerj John-Rood timkrase zerone0x gerardward2007 obviyus tosh-hamburg azade-c roshanasingh4 + bjesuiter cheeeee Josh Phillips Whoaa512 YuriNachos chriseidhof ysqander superman32432432 vignesh07 Yurii Chukhlib + grp06 antons austinm911 blacksmith-sh[bot] dan-dr HeimdallStrategy imfing jalehman jarvis-medmatic kkarimi + mahmoudashraf93 petter-b pkrmf RandyVentures Ryan Lisse erikpr1994 Ghost jonasjancarik Keith the Silly Goose L36 Server + Marc mitschabaude-bot neist ngutman chrisrodz dougvk Friederike Seiler gabriel-trigo iamadig Kit + koala73 manmal ogulcancelik pasogott petradonka rubyrunsstuff sibbl suminhthanh VACInc wes-davis + zats 24601 Chris Taylor Django Navarro evalexpr henrino3 humanwritten larlyssa mkbehr oswalpalash + pcty-nextgen-service-account Syhids Aaron Konyer aaronveklabs adam91holt erik-agens fcatuhe ivanrvpereira jayhickey jeffersonwarrior + jeffersonwarrior Jonathan D. Rhyne (DJ-D) jverdi longmaba mickahouan mjrussell p6l-richard philipp-spiess robaxelsen Sash Catanzarite + T5-AndyML VAC zknicker alejandro maza andrewting19 anpoirier Asleep123 bolismauro cash-echo-bot Clawd + conhecendocontato Dimitrios Ploutarchos Drake Thomsen Felix Krause gtsifrikas HazAT hrdwdmrbl hugobarauna Jamie Openshaw Jarvis + Jefferson Nunn Kevin Lin kitze levifig Lloyd loukotal martinpucik Miles mrdbstn MSch + Mustafa Tag Eldeen ndraiman nexty5870 odysseus0 prathamdby reeltimeapps RLTCmpe rodrigouroz Rolf Fredheim Rony Kelner + Samrat Jha siraht snopoke The Admiral thesash Ubuntu voidserf wstock Zach Knickerbocker Alphonse-arianee + Azade carlulsoe ddyo Erik latitudeki5223 Manuel Maly Mourad Boustani odrobnik pcty-nextgen-ios-builder Quentin + Randy Torres rhjoh ronak-guliani William Stock

diff --git a/docs/tools/subagents.md b/docs/tools/subagents.md index e7a7faec5..9c6bd14ef 100644 --- a/docs/tools/subagents.md +++ b/docs/tools/subagents.md @@ -69,7 +69,8 @@ Note: the merge is additive, so main profiles are always available as fallbacks. Sub-agents report back via an announce step: - The announce step runs inside the sub-agent session (not the requester session). - If the sub-agent replies exactly `ANNOUNCE_SKIP`, nothing is posted. -- Otherwise the announce reply is posted to the requester chat channel via the gateway `send` method. +- Otherwise the announce reply is posted to the requester chat channel via a follow-up `agent` call (`deliver=true`). +- Announce replies preserve thread/topic routing when available (Slack threads, Telegram topics, Matrix threads). - Announce messages are normalized to a stable template: - `Status:` derived from the run outcome (`success`, `error`, `timeout`, or `unknown`). - `Result:` the summary content from the announce step (or `(not available)` if missing). diff --git a/src/agents/clawdbot-tools.ts b/src/agents/clawdbot-tools.ts index 98717cd3b..25a2199e7 100644 --- a/src/agents/clawdbot-tools.ts +++ b/src/agents/clawdbot-tools.ts @@ -27,6 +27,10 @@ export function createClawdbotTools(options?: { agentSessionKey?: string; agentChannel?: GatewayMessageChannel; agentAccountId?: string; + /** Delivery target (e.g. telegram:group:123:topic:456) for topic/thread routing. */ + agentTo?: string; + /** Thread/topic identifier for routing replies to the originating thread. */ + agentThreadId?: string | number; agentDir?: string; sandboxRoot?: string; workspaceDir?: string; @@ -108,6 +112,8 @@ export function createClawdbotTools(options?: { agentSessionKey: options?.agentSessionKey, agentChannel: options?.agentChannel, agentAccountId: options?.agentAccountId, + agentTo: options?.agentTo, + agentThreadId: options?.agentThreadId, sandboxed: options?.sandboxed, }), createSessionStatusTool({ diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 31096d7cc..45a0db943 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -208,6 +208,8 @@ export async function runEmbeddedPiAgent( messageChannel: params.messageChannel, messageProvider: params.messageProvider, agentAccountId: params.agentAccountId, + messageTo: params.messageTo, + messageThreadId: params.messageThreadId, currentChannelId: params.currentChannelId, currentThreadTs: params.currentThreadTs, replyToMode: params.replyToMode, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index e75047082..941c24b09 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -148,6 +148,8 @@ export async function runEmbeddedAttempt( sandbox, messageProvider: params.messageChannel ?? params.messageProvider, agentAccountId: params.agentAccountId, + messageTo: params.messageTo, + messageThreadId: params.messageThreadId, sessionKey: params.sessionKey ?? params.sessionId, agentDir, workspaceDir: effectiveWorkspace, diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index 52e85d3b3..c0320bbbe 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -23,6 +23,10 @@ export type RunEmbeddedPiAgentParams = { messageChannel?: string; messageProvider?: string; agentAccountId?: string; + /** Delivery target (e.g. telegram:group:123:topic:456) for topic/thread routing. */ + messageTo?: string; + /** Thread/topic identifier for routing replies to the originating thread. */ + messageThreadId?: string | number; /** Current channel ID for auto-threading (Slack). */ currentChannelId?: string; /** Current thread timestamp for auto-threading (Slack). */ diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 8aa0ec195..5ae947ec7 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -21,6 +21,8 @@ export type EmbeddedRunAttemptParams = { messageChannel?: string; messageProvider?: string; agentAccountId?: string; + messageTo?: string; + messageThreadId?: string | number; currentChannelId?: string; currentThreadTs?: string; replyToMode?: "off" | "first" | "all"; diff --git a/src/agents/pi-tools.ts b/src/agents/pi-tools.ts index cbf75461d..677f391da 100644 --- a/src/agents/pi-tools.ts +++ b/src/agents/pi-tools.ts @@ -102,6 +102,8 @@ export function createClawdbotCodingTools(options?: { exec?: ExecToolDefaults & ProcessToolDefaults; messageProvider?: string; agentAccountId?: string; + messageTo?: string; + messageThreadId?: string | number; sandbox?: SandboxContext | null; sessionKey?: string; agentDir?: string; @@ -265,6 +267,8 @@ export function createClawdbotCodingTools(options?: { agentSessionKey: options?.sessionKey, agentChannel: resolveGatewayMessageChannel(options?.messageProvider), agentAccountId: options?.agentAccountId, + agentTo: options?.messageTo, + agentThreadId: options?.messageThreadId, agentDir: options?.agentDir, sandboxRoot, workspaceDir: options?.workspaceDir, diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 610b88234..444726efc 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -98,6 +98,8 @@ function resolveAnnounceOrigin( async function sendAnnounce(item: AnnounceQueueItem) { const origin = item.origin; + const threadId = + origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined; await callGateway({ method: "agent", params: { @@ -106,6 +108,7 @@ async function sendAnnounce(item: AnnounceQueueItem) { channel: origin?.channel, accountId: origin?.accountId, to: origin?.to, + threadId, deliver: true, idempotencyKey: crypto.randomUUID(), }, @@ -424,6 +427,11 @@ export async function runSubagentAnnounceFlow(params: { deliver: true, channel: directOrigin?.channel, accountId: directOrigin?.accountId, + to: directOrigin?.to, + threadId: + directOrigin?.threadId != null && directOrigin.threadId !== "" + ? String(directOrigin.threadId) + : undefined, idempotencyKey: crypto.randomUUID(), }, expectFinal: true, diff --git a/src/agents/tools/sessions-spawn-tool.ts b/src/agents/tools/sessions-spawn-tool.ts index e9b8e0764..f3b294484 100644 --- a/src/agents/tools/sessions-spawn-tool.ts +++ b/src/agents/tools/sessions-spawn-tool.ts @@ -61,6 +61,8 @@ export function createSessionsSpawnTool(opts?: { agentSessionKey?: string; agentChannel?: GatewayMessageChannel; agentAccountId?: string; + agentTo?: string; + agentThreadId?: string | number; sandboxed?: boolean; }): AnyAgentTool { return { @@ -83,6 +85,8 @@ export function createSessionsSpawnTool(opts?: { const requesterOrigin = normalizeDeliveryContext({ channel: opts?.agentChannel, accountId: opts?.agentAccountId, + to: opts?.agentTo, + threadId: opts?.agentThreadId, }); const runTimeoutSeconds = (() => { const explicit = diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index ccc793f34..bbdf6df0d 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -210,6 +210,8 @@ export async function runAgentTurnWithFallback(params: { sessionKey: params.sessionKey, messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || undefined, agentAccountId: params.sessionCtx.AccountId, + messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To, + messageThreadId: params.sessionCtx.MessageThreadId ?? undefined, // Provider threading context for tool auto-injection ...buildThreadingToolContext({ sessionCtx: params.sessionCtx, diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index 85e3e9e16..a7d590750 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -106,6 +106,8 @@ export async function runMemoryFlushIfNeeded(params: { sessionKey: params.sessionKey, messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || undefined, agentAccountId: params.sessionCtx.AccountId, + messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To, + messageThreadId: params.sessionCtx.MessageThreadId ?? undefined, // Provider threading context for tool auto-injection ...buildThreadingToolContext({ sessionCtx: params.sessionCtx, diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 0385d71b8..f3adeeee9 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -145,6 +145,8 @@ export function createFollowupRunner(params: { sessionKey: queued.run.sessionKey, messageProvider: queued.run.messageProvider, agentAccountId: queued.run.agentAccountId, + messageTo: queued.originatingTo, + messageThreadId: queued.originatingThreadId, sessionFile: queued.run.sessionFile, workspaceDir: queued.run.workspaceDir, config: queued.run.config, diff --git a/src/auto-reply/reply/get-reply-inline-actions.ts b/src/auto-reply/reply/get-reply-inline-actions.ts index 14bc024c0..aa41858d5 100644 --- a/src/auto-reply/reply/get-reply-inline-actions.ts +++ b/src/auto-reply/reply/get-reply-inline-actions.ts @@ -168,6 +168,8 @@ export async function handleInlineActions(params: { agentSessionKey: sessionKey, agentChannel: channel, agentAccountId: (ctx as { AccountId?: string }).AccountId, + agentTo: ctx.OriginatingTo ?? ctx.To, + agentThreadId: ctx.MessageThreadId ?? undefined, agentDir, workspaceDir, config: cfg, diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index a1b1a4f5b..d87805287 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -255,6 +255,22 @@ describe("routeReply", () => { ); }); + it("uses threadId as threadTs for Slack when replyToId is missing", async () => { + mocks.sendMessageSlack.mockClear(); + await routeReply({ + payload: { text: "hi" }, + channel: "slack", + to: "channel:C123", + threadId: "1710000000.9999", + cfg: {} as never, + }); + expect(mocks.sendMessageSlack).toHaveBeenCalledWith( + "channel:C123", + "hi", + expect.objectContaining({ threadTs: "1710000000.9999" }), + ); + }); + it("sends multiple mediaUrls (caption only on first)", async () => { mocks.sendMessageSlack.mockClear(); await routeReply({ diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index 357a4ba49..c907115d3 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -100,6 +100,11 @@ export async function routeReply(params: RouteReplyParams): Promise { const message = `Delivery failed (${deliveryChannel}${deliveryTarget ? ` to ${deliveryTarget}` : ""}): ${String(err)}`; @@ -153,6 +158,8 @@ export async function deliverAgentCommandResult(params: { to: deliveryTarget, accountId: resolvedAccountId, payloads: deliveryPayloads, + replyToId: resolvedReplyToId ?? null, + threadId: resolvedThreadTarget ?? null, bestEffort: bestEffortDeliver, onError: (err) => logDeliveryError(err), onPayload: logPayload, diff --git a/src/commands/agent/run-context.ts b/src/commands/agent/run-context.ts index 2386db568..a8da80d88 100644 --- a/src/commands/agent/run-context.ts +++ b/src/commands/agent/run-context.ts @@ -14,5 +14,14 @@ export function resolveAgentRunContext(opts: AgentCommandOpts): AgentRunContext const normalizedAccountId = normalizeAccountId(merged.accountId ?? opts.accountId); if (normalizedAccountId) merged.accountId = normalizedAccountId; + if ( + merged.currentThreadTs == null && + opts.threadId != null && + opts.threadId !== "" && + opts.threadId !== null + ) { + merged.currentThreadTs = String(opts.threadId); + } + return merged; } diff --git a/src/commands/agent/types.ts b/src/commands/agent/types.ts index deb1b7bc8..ed18d0b45 100644 --- a/src/commands/agent/types.ts +++ b/src/commands/agent/types.ts @@ -46,6 +46,8 @@ export type AgentCommandOpts = { replyChannel?: string; /** Override delivery account id (separate from session routing). */ replyAccountId?: string; + /** Override delivery thread/topic id (separate from session routing). */ + threadId?: string | number; /** Message channel context (webchat|voicewake|whatsapp|...). */ messageChannel?: string; channel?: string; // delivery channel (whatsapp|telegram|...) diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 8ecf576f7..c41181623 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -56,11 +56,13 @@ function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry { const sameDelivery = (entry.deliveryContext?.channel ?? undefined) === nextDelivery?.channel && (entry.deliveryContext?.to ?? undefined) === nextDelivery?.to && - (entry.deliveryContext?.accountId ?? undefined) === nextDelivery?.accountId; + (entry.deliveryContext?.accountId ?? undefined) === nextDelivery?.accountId && + (entry.deliveryContext?.threadId ?? undefined) === nextDelivery?.threadId; const sameLast = entry.lastChannel === normalized.lastChannel && entry.lastTo === normalized.lastTo && - entry.lastAccountId === normalized.lastAccountId; + entry.lastAccountId === normalized.lastAccountId && + entry.lastThreadId === normalized.lastThreadId; if (sameDelivery && sameLast) return entry; return { ...entry, @@ -68,6 +70,7 @@ function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry { lastChannel: normalized.lastChannel, lastTo: normalized.lastTo, lastAccountId: normalized.lastAccountId, + lastThreadId: normalized.lastThreadId, }; } @@ -379,11 +382,12 @@ export async function updateLastRoute(params: { channel?: SessionEntry["lastChannel"]; to?: string; accountId?: string; + threadId?: string | number; deliveryContext?: DeliveryContext; ctx?: MsgContext; groupResolution?: import("./types.js").GroupKeyResolution | null; }) { - const { storePath, sessionKey, channel, to, accountId, ctx } = params; + const { storePath, sessionKey, channel, to, accountId, threadId, ctx } = params; return await withSessionStoreLock(storePath, async () => { const store = loadSessionStore(storePath); const existing = store[sessionKey]; @@ -393,6 +397,7 @@ export async function updateLastRoute(params: { channel, to, accountId, + threadId, }); const mergedInput = mergeDeliveryContext(explicitContext, inlineContext); const merged = mergeDeliveryContext(mergedInput, deliveryContextFromSession(existing)); @@ -401,6 +406,7 @@ export async function updateLastRoute(params: { channel: merged?.channel, to: merged?.to, accountId: merged?.accountId, + threadId: merged?.threadId, }, }); const metaPatch = ctx @@ -417,6 +423,7 @@ export async function updateLastRoute(params: { lastChannel: normalized.lastChannel, lastTo: normalized.lastTo, lastAccountId: normalized.lastAccountId, + lastThreadId: normalized.lastThreadId, }; const next = mergeSessionEntry( existing, diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 7578b0b1a..f7ed268ec 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -89,6 +89,7 @@ export type SessionEntry = { lastChannel?: SessionChannelId; lastTo?: string; lastAccountId?: string; + lastThreadId?: string | number; skillsSnapshot?: SessionSkillSnapshot; systemPromptReport?: SessionSystemPromptReport; }; diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index e49c675a5..f31b6cd92 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -57,6 +57,7 @@ export const AgentParamsSchema = Type.Object( replyChannel: Type.Optional(Type.String()), accountId: Type.Optional(Type.String()), replyAccountId: Type.Optional(Type.String()), + threadId: Type.Optional(Type.String()), timeout: Type.Optional(Type.Integer({ minimum: 0 })), lane: Type.Optional(Type.String()), extraSystemPrompt: Type.Optional(Type.String()), diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index d19e025ce..a82e31676 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -71,6 +71,7 @@ export const agentHandlers: GatewayRequestHandlers = { replyChannel?: string; accountId?: string; replyAccountId?: string; + threadId?: string; lane?: string; extraSystemPrompt?: string; idempotencyKey: string; @@ -257,10 +258,15 @@ export const agentHandlers: GatewayRequestHandlers = { : typeof request.to === "string" && request.to.trim() ? request.to.trim() : undefined; + const explicitThreadId = + typeof request.threadId === "string" && request.threadId.trim() + ? request.threadId.trim() + : undefined; const deliveryPlan = resolveAgentDeliveryPlan({ sessionEntry, requestedChannel: request.replyChannel ?? request.channel, explicitTo, + explicitThreadId, accountId: request.replyAccountId ?? request.accountId, wantsDelivery, }); @@ -298,6 +304,8 @@ export const agentHandlers: GatewayRequestHandlers = { }); respond(true, accepted, undefined, { runId }); + const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId; + void agentCommand( { message, @@ -310,9 +318,11 @@ export const agentHandlers: GatewayRequestHandlers = { deliveryTargetMode, channel: resolvedChannel, accountId: resolvedAccountId, + threadId: resolvedThreadId, runContext: { messageChannel: resolvedChannel, accountId: resolvedAccountId, + currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined, }, timeout: request.timeout?.toString(), bestEffortDeliver, diff --git a/src/infra/outbound/agent-delivery.ts b/src/infra/outbound/agent-delivery.ts index 3477a161b..eaa41fac3 100644 --- a/src/infra/outbound/agent-delivery.ts +++ b/src/infra/outbound/agent-delivery.ts @@ -22,6 +22,7 @@ export type AgentDeliveryPlan = { resolvedChannel: GatewayMessageChannel; resolvedTo?: string; resolvedAccountId?: string; + resolvedThreadId?: string | number; deliveryTargetMode?: ChannelOutboundTargetMode; }; @@ -29,6 +30,7 @@ export function resolveAgentDeliveryPlan(params: { sessionEntry?: SessionEntry; requestedChannel?: string; explicitTo?: string; + explicitThreadId?: string | number; accountId?: string; wantsDelivery: boolean; }): AgentDeliveryPlan { @@ -46,6 +48,7 @@ export function resolveAgentDeliveryPlan(params: { entry: params.sessionEntry, requestedChannel: requestedChannel === INTERNAL_MESSAGE_CHANNEL ? "last" : requestedChannel, explicitTo, + explicitThreadId: params.explicitThreadId, }); const resolvedChannel = (() => { @@ -89,6 +92,7 @@ export function resolveAgentDeliveryPlan(params: { resolvedChannel, resolvedTo, resolvedAccountId, + resolvedThreadId: baseDelivery.threadId, deliveryTargetMode, }; } diff --git a/src/infra/outbound/targets.test.ts b/src/infra/outbound/targets.test.ts index 9fc1067f6..8ffee062d 100644 --- a/src/infra/outbound/targets.test.ts +++ b/src/infra/outbound/targets.test.ts @@ -124,10 +124,12 @@ describe("resolveSessionDeliveryTarget", () => { channel: "whatsapp", to: "+1555", accountId: "acct-1", + threadId: undefined, mode: "implicit", lastChannel: "whatsapp", lastTo: "+1555", lastAccountId: "acct-1", + lastThreadId: undefined, }); }); @@ -146,10 +148,12 @@ describe("resolveSessionDeliveryTarget", () => { channel: "telegram", to: undefined, accountId: undefined, + threadId: undefined, mode: "implicit", lastChannel: "whatsapp", lastTo: "+1555", lastAccountId: undefined, + lastThreadId: undefined, }); }); @@ -169,10 +173,12 @@ describe("resolveSessionDeliveryTarget", () => { channel: "telegram", to: "+1555", accountId: undefined, + threadId: undefined, mode: "implicit", lastChannel: "whatsapp", lastTo: "+1555", lastAccountId: undefined, + lastThreadId: undefined, }); }); @@ -192,10 +198,12 @@ describe("resolveSessionDeliveryTarget", () => { channel: "slack", to: undefined, accountId: undefined, + threadId: undefined, mode: "implicit", lastChannel: "whatsapp", lastTo: "+1555", lastAccountId: undefined, + lastThreadId: undefined, }); }); }); diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index c31bfbc4b..d37d8c69f 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -35,16 +35,19 @@ export type SessionDeliveryTarget = { channel?: DeliverableMessageChannel; to?: string; accountId?: string; + threadId?: string | number; mode: ChannelOutboundTargetMode; lastChannel?: DeliverableMessageChannel; lastTo?: string; lastAccountId?: string; + lastThreadId?: string | number; }; export function resolveSessionDeliveryTarget(params: { entry?: SessionEntry; requestedChannel?: GatewayMessageChannel | "last"; explicitTo?: string; + explicitThreadId?: string | number; fallbackChannel?: DeliverableMessageChannel; allowMismatchedLastTo?: boolean; mode?: ChannelOutboundTargetMode; @@ -54,6 +57,7 @@ export function resolveSessionDeliveryTarget(params: { context?.channel && isDeliverableMessageChannel(context.channel) ? context.channel : undefined; const lastTo = context?.to; const lastAccountId = context?.accountId; + const lastThreadId = context?.threadId; const rawRequested = params.requestedChannel ?? "last"; const requested = rawRequested === "last" ? "last" : normalizeMessageChannel(rawRequested); @@ -68,6 +72,10 @@ export function resolveSessionDeliveryTarget(params: { typeof params.explicitTo === "string" && params.explicitTo.trim() ? params.explicitTo.trim() : undefined; + const explicitThreadId = + params.explicitThreadId != null && params.explicitThreadId !== "" + ? params.explicitThreadId + : undefined; let channel = requestedChannel === "last" ? lastChannel : requestedChannel; if (!channel && params.fallbackChannel && isDeliverableMessageChannel(params.fallbackChannel)) { @@ -84,16 +92,19 @@ export function resolveSessionDeliveryTarget(params: { } const accountId = channel && channel === lastChannel ? lastAccountId : undefined; + const threadId = channel && channel === lastChannel ? lastThreadId : undefined; const mode = params.mode ?? (explicitTo ? "explicit" : "implicit"); return { channel, to, accountId, + threadId: explicitThreadId ?? threadId, mode, lastChannel, lastTo, lastAccountId, + lastThreadId, }; } diff --git a/src/slack/monitor/message-handler/prepare.ts b/src/slack/monitor/message-handler/prepare.ts index f3131377e..8d286d69c 100644 --- a/src/slack/monitor/message-handler/prepare.ts +++ b/src/slack/monitor/message-handler/prepare.ts @@ -479,6 +479,7 @@ export async function prepareSlackMessage(params: { ParentSessionKey: threadKeys.parentSessionKey, ThreadStarterBody: threadStarterBody, ThreadLabel: threadLabel, + MessageThreadId: isThreadReply ? threadTs : undefined, Timestamp: message.ts ? Math.round(Number(message.ts) * 1000) : undefined, WasMentioned: isRoomish ? effectiveWasMentioned : undefined, MediaPath: media?.path, diff --git a/src/utils/delivery-context.test.ts b/src/utils/delivery-context.test.ts index fee2628f8..705e6d27f 100644 --- a/src/utils/delivery-context.test.ts +++ b/src/utils/delivery-context.test.ts @@ -39,10 +39,13 @@ describe("delivery context helpers", () => { }); it("builds stable keys only when channel and to are present", () => { - expect(deliveryContextKey({ channel: "whatsapp", to: "+1555" })).toBe("whatsapp|+1555|"); + expect(deliveryContextKey({ channel: "whatsapp", to: "+1555" })).toBe("whatsapp|+1555||"); expect(deliveryContextKey({ channel: "whatsapp" })).toBeUndefined(); expect(deliveryContextKey({ channel: "whatsapp", to: "+1555", accountId: "acct-1" })).toBe( - "whatsapp|+1555|acct-1", + "whatsapp|+1555|acct-1|", + ); + expect(deliveryContextKey({ channel: "slack", to: "channel:C1", threadId: "123.456" })).toBe( + "slack|channel:C1||123.456", ); }); @@ -64,17 +67,24 @@ describe("delivery context helpers", () => { deliveryContextFromSession({ channel: "telegram", lastTo: " 123 ", + lastThreadId: " 999 ", }), ).toEqual({ channel: "telegram", to: "123", accountId: undefined, + threadId: "999", }); }); it("normalizes delivery fields and mirrors them on session entries", () => { const normalized = normalizeSessionDeliveryFields({ - deliveryContext: { channel: " Slack ", to: " channel:1 ", accountId: " acct-2 " }, + deliveryContext: { + channel: " Slack ", + to: " channel:1 ", + accountId: " acct-2 ", + threadId: " 444 ", + }, lastChannel: " whatsapp ", lastTo: " +1555 ", }); @@ -83,9 +93,11 @@ describe("delivery context helpers", () => { channel: "whatsapp", to: "+1555", accountId: "acct-2", + threadId: "444", }); expect(normalized.lastChannel).toBe("whatsapp"); expect(normalized.lastTo).toBe("+1555"); expect(normalized.lastAccountId).toBe("acct-2"); + expect(normalized.lastThreadId).toBe("444"); }); }); diff --git a/src/utils/delivery-context.ts b/src/utils/delivery-context.ts index b05e2c6b0..9f5803e17 100644 --- a/src/utils/delivery-context.ts +++ b/src/utils/delivery-context.ts @@ -5,6 +5,7 @@ export type DeliveryContext = { channel?: string; to?: string; accountId?: string; + threadId?: string | number; }; export type DeliveryContextSessionSource = { @@ -12,6 +13,7 @@ export type DeliveryContextSessionSource = { lastChannel?: string; lastTo?: string; lastAccountId?: string; + lastThreadId?: string | number; deliveryContext?: DeliveryContext; }; @@ -23,12 +25,22 @@ export function normalizeDeliveryContext(context?: DeliveryContext): DeliveryCon : undefined; const to = typeof context.to === "string" ? context.to.trim() : undefined; const accountId = normalizeAccountId(context.accountId); - if (!channel && !to && !accountId) return undefined; - return { + const threadId = + typeof context.threadId === "number" && Number.isFinite(context.threadId) + ? Math.trunc(context.threadId) + : typeof context.threadId === "string" + ? context.threadId.trim() + : undefined; + const normalizedThreadId = + typeof threadId === "string" ? (threadId ? threadId : undefined) : threadId; + if (!channel && !to && !accountId && normalizedThreadId == null) return undefined; + const normalized: DeliveryContext = { channel: channel || undefined, to: to || undefined, accountId, }; + if (normalizedThreadId != null) normalized.threadId = normalizedThreadId; + return normalized; } export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSource): { @@ -36,6 +48,7 @@ export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSo lastChannel?: string; lastTo?: string; lastAccountId?: string; + lastThreadId?: string | number; } { if (!source) { return { @@ -43,6 +56,7 @@ export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSo lastChannel: undefined, lastTo: undefined, lastAccountId: undefined, + lastThreadId: undefined, }; } @@ -51,6 +65,7 @@ export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSo channel: source.lastChannel ?? source.channel, to: source.lastTo, accountId: source.lastAccountId, + threadId: source.lastThreadId, }), normalizeDeliveryContext(source.deliveryContext), ); @@ -61,6 +76,7 @@ export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSo lastChannel: undefined, lastTo: undefined, lastAccountId: undefined, + lastThreadId: undefined, }; } @@ -69,6 +85,7 @@ export function normalizeSessionDeliveryFields(source?: DeliveryContextSessionSo lastChannel: merged.channel, lastTo: merged.to, lastAccountId: merged.accountId, + lastThreadId: merged.threadId, }; } @@ -90,11 +107,14 @@ export function mergeDeliveryContext( channel: normalizedPrimary?.channel ?? normalizedFallback?.channel, to: normalizedPrimary?.to ?? normalizedFallback?.to, accountId: normalizedPrimary?.accountId ?? normalizedFallback?.accountId, + threadId: normalizedPrimary?.threadId ?? normalizedFallback?.threadId, }); } export function deliveryContextKey(context?: DeliveryContext): string | undefined { const normalized = normalizeDeliveryContext(context); if (!normalized?.channel || !normalized?.to) return undefined; - return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}`; + const threadId = + normalized.threadId != null && normalized.threadId !== "" ? String(normalized.threadId) : ""; + return `${normalized.channel}|${normalized.to}|${normalized.accountId ?? ""}|${threadId}`; }