diff --git a/CHANGELOG.md b/CHANGELOG.md index b33b621e8..a09fcd603 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Docs: https://docs.clawd.bot - Markdown: add per-channel table conversion (bullets for Signal/WhatsApp, code blocks elsewhere). (#1495) Thanks @odysseus0. ### Fixes +- Gateway/WebChat: route inbound messages through the unified dispatch pipeline so /new works consistently across WebChat/TUI and channels. - Discord: limit autoThread mention bypass to bot-owned threads; keep ack reactions mention-gated. (#1511) Thanks @pvoo. - Gateway: accept null optional fields in exec approval requests. (#1511) Thanks @pvoo. - TUI: forward unknown slash commands (for example, `/context`) to the Gateway. diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 0e7dfa233..532bac00a 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -82,7 +82,8 @@ export async function runAgentTurnWithFallback(params: { // Track payloads sent directly (not via pipeline) during tool flush to avoid duplicates. const directlySentBlockKeys = new Set(); - const runId = crypto.randomUUID(); + const runId = params.opts?.runId ?? crypto.randomUUID(); + params.opts?.onAgentRunStart?.(runId); if (params.sessionKey) { registerAgentRunContext(runId, { sessionKey: params.sessionKey, @@ -174,6 +175,7 @@ export async function runAgentTurnWithFallback(params: { extraSystemPrompt: params.followupRun.run.extraSystemPrompt, ownerNumbers: params.followupRun.run.ownerNumbers, cliSessionId, + images: params.opts?.images, }) .then((result) => { emitAgentEvent({ @@ -248,6 +250,8 @@ export async function runAgentTurnWithFallback(params: { bashElevated: params.followupRun.run.bashElevated, timeoutMs: params.followupRun.run.timeoutMs, runId, + images: params.opts?.images, + abortSignal: params.opts?.abortSignal, blockReplyBreak: params.resolvedBlockStreamingBreak, blockReplyChunking: params.blockReplyChunking, onPartialReply: allowPartialStream diff --git a/src/auto-reply/reply/provider-dispatcher.ts b/src/auto-reply/reply/provider-dispatcher.ts index 68e2431d1..e4766156e 100644 --- a/src/auto-reply/reply/provider-dispatcher.ts +++ b/src/auto-reply/reply/provider-dispatcher.ts @@ -1,58 +1,44 @@ import type { ClawdbotConfig } from "../../config/config.js"; -import type { FinalizedMsgContext } from "../templating.js"; +import type { FinalizedMsgContext, MsgContext } from "../templating.js"; import type { GetReplyOptions } from "../types.js"; -import type { DispatchFromConfigResult } from "./dispatch-from-config.js"; -import { dispatchReplyFromConfig } from "./dispatch-from-config.js"; +import type { DispatchInboundResult } from "../dispatch.js"; import { - createReplyDispatcher, - createReplyDispatcherWithTyping, - type ReplyDispatcherOptions, - type ReplyDispatcherWithTypingOptions, + dispatchInboundMessageWithBufferedDispatcher, + dispatchInboundMessageWithDispatcher, +} from "../dispatch.js"; +import type { + ReplyDispatcherOptions, + ReplyDispatcherWithTypingOptions, } from "./reply-dispatcher.js"; export async function dispatchReplyWithBufferedBlockDispatcher(params: { - ctx: FinalizedMsgContext; + ctx: MsgContext | FinalizedMsgContext; cfg: ClawdbotConfig; dispatcherOptions: ReplyDispatcherWithTypingOptions; replyOptions?: Omit; replyResolver?: typeof import("../reply.js").getReplyFromConfig; -}): Promise { - const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping( - params.dispatcherOptions, - ); - - const result = await dispatchReplyFromConfig({ +}): Promise { + return await dispatchInboundMessageWithBufferedDispatcher({ ctx: params.ctx, cfg: params.cfg, - dispatcher, + dispatcherOptions: params.dispatcherOptions, replyResolver: params.replyResolver, - replyOptions: { - ...params.replyOptions, - ...replyOptions, - }, + replyOptions: params.replyOptions, }); - - markDispatchIdle(); - return result; } export async function dispatchReplyWithDispatcher(params: { - ctx: FinalizedMsgContext; + ctx: MsgContext | FinalizedMsgContext; cfg: ClawdbotConfig; dispatcherOptions: ReplyDispatcherOptions; replyOptions?: Omit; replyResolver?: typeof import("../reply.js").getReplyFromConfig; -}): Promise { - const dispatcher = createReplyDispatcher(params.dispatcherOptions); - - const result = await dispatchReplyFromConfig({ +}): Promise { + return await dispatchInboundMessageWithDispatcher({ ctx: params.ctx, cfg: params.cfg, - dispatcher, + dispatcherOptions: params.dispatcherOptions, replyResolver: params.replyResolver, replyOptions: params.replyOptions, }); - - await dispatcher.waitForIdle(); - return result; } diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index e1bf611db..250c14091 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -1,3 +1,4 @@ +import type { ImageContent } from "@mariozechner/pi-ai"; import type { TypingController } from "./reply/typing.js"; export type BlockReplyContext = { @@ -13,6 +14,14 @@ export type ModelSelectedContext = { }; export type GetReplyOptions = { + /** Override run id for agent events (defaults to random UUID). */ + runId?: string; + /** Abort signal for the underlying agent run. */ + abortSignal?: AbortSignal; + /** Optional inbound images (used for webchat attachments). */ + images?: ImageContent[]; + /** Notifies when an agent run actually starts (useful for webchat command handling). */ + onAgentRunStart?: (runId: string) => void; onReplyStart?: () => Promise | void; onTypingController?: (typing: TypingController) => void; isHeartbeat?: boolean; diff --git a/src/discord/monitor.slash.test.ts b/src/discord/monitor.slash.test.ts index 018f93ed0..af098eb96 100644 --- a/src/discord/monitor.slash.test.ts +++ b/src/discord/monitor.slash.test.ts @@ -1,4 +1,5 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; const dispatchMock = vi.fn(); @@ -20,15 +21,34 @@ vi.mock("@buape/carbon", () => ({ }, })); -vi.mock("../auto-reply/reply/dispatch-from-config.js", () => ({ - dispatchReplyFromConfig: (...args: unknown[]) => dispatchMock(...args), -})); +vi.mock("../auto-reply/dispatch.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + dispatchInboundMessage: (...args: unknown[]) => dispatchMock(...args), + dispatchInboundMessageWithDispatcher: (...args: unknown[]) => dispatchMock(...args), + dispatchInboundMessageWithBufferedDispatcher: (...args: unknown[]) => dispatchMock(...args), + }; +}); beforeEach(() => { - dispatchMock.mockReset().mockImplementation(async ({ dispatcher }) => { - dispatcher.sendToolResult({ text: "tool update" }); - dispatcher.sendFinalReply({ text: "final reply" }); - return { queuedFinal: true, counts: { tool: 1, block: 0, final: 1 } }; + dispatchMock.mockReset().mockImplementation(async (params) => { + if ("dispatcher" in params && params.dispatcher) { + params.dispatcher.sendToolResult({ text: "tool update" }); + params.dispatcher.sendFinalReply({ text: "final reply" }); + return { queuedFinal: true, counts: { tool: 1, block: 0, final: 1 } }; + } + if ("dispatcherOptions" in params && params.dispatcherOptions) { + const { dispatcher, markDispatchIdle } = createReplyDispatcherWithTyping( + params.dispatcherOptions, + ); + dispatcher.sendToolResult({ text: "tool update" }); + dispatcher.sendFinalReply({ text: "final reply" }); + await dispatcher.waitForIdle(); + markDispatchIdle(); + return { queuedFinal: true, counts: dispatcher.getQueuedCounts() }; + } + return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }; }); }); diff --git a/src/discord/monitor.tool-result.accepts-guild-messages-mentionpatterns-match.test.ts b/src/discord/monitor.tool-result.accepts-guild-messages-mentionpatterns-match.test.ts index b31387b45..d91c7b3d3 100644 --- a/src/discord/monitor.tool-result.accepts-guild-messages-mentionpatterns-match.test.ts +++ b/src/discord/monitor.tool-result.accepts-guild-messages-mentionpatterns-match.test.ts @@ -18,9 +18,15 @@ vi.mock("./send.js", () => ({ reactMock(...args); }, })); -vi.mock("../auto-reply/reply/dispatch-from-config.js", () => ({ - dispatchReplyFromConfig: (...args: unknown[]) => dispatchMock(...args), -})); +vi.mock("../auto-reply/dispatch.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + dispatchInboundMessage: (...args: unknown[]) => dispatchMock(...args), + dispatchInboundMessageWithDispatcher: (...args: unknown[]) => dispatchMock(...args), + dispatchInboundMessageWithBufferedDispatcher: (...args: unknown[]) => dispatchMock(...args), + }; +}); vi.mock("../pairing/pairing-store.js", () => ({ readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args), upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args), @@ -41,7 +47,7 @@ beforeEach(() => { updateLastRouteMock.mockReset(); dispatchMock.mockReset().mockImplementation(async ({ dispatcher }) => { dispatcher.sendFinalReply({ text: "hi" }); - return { queuedFinal: true, counts: { final: 1 } }; + return { queuedFinal: true, counts: { tool: 0, block: 0, final: 1 } }; }); readAllowFromStoreMock.mockReset().mockResolvedValue([]); upsertPairingRequestMock.mockReset().mockResolvedValue({ code: "PAIRCODE", created: true }); diff --git a/src/discord/monitor.tool-result.sends-status-replies-responseprefix.test.ts b/src/discord/monitor.tool-result.sends-status-replies-responseprefix.test.ts index 9da41c577..88fd6e212 100644 --- a/src/discord/monitor.tool-result.sends-status-replies-responseprefix.test.ts +++ b/src/discord/monitor.tool-result.sends-status-replies-responseprefix.test.ts @@ -18,9 +18,15 @@ vi.mock("./send.js", () => ({ reactMock(...args); }, })); -vi.mock("../auto-reply/reply/dispatch-from-config.js", () => ({ - dispatchReplyFromConfig: (...args: unknown[]) => dispatchMock(...args), -})); +vi.mock("../auto-reply/dispatch.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + dispatchInboundMessage: (...args: unknown[]) => dispatchMock(...args), + dispatchInboundMessageWithDispatcher: (...args: unknown[]) => dispatchMock(...args), + dispatchInboundMessageWithBufferedDispatcher: (...args: unknown[]) => dispatchMock(...args), + }; +}); vi.mock("../pairing/pairing-store.js", () => ({ readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args), upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args), @@ -40,7 +46,7 @@ beforeEach(() => { updateLastRouteMock.mockReset(); dispatchMock.mockReset().mockImplementation(async ({ dispatcher }) => { dispatcher.sendFinalReply({ text: "hi" }); - return { queuedFinal: true, counts: { final: 1 } }; + return { queuedFinal: true, counts: { tool: 0, block: 0, final: 1 } }; }); readAllowFromStoreMock.mockReset().mockResolvedValue([]); upsertPairingRequestMock.mockReset().mockResolvedValue({ code: "PAIRCODE", created: true }); diff --git a/src/discord/monitor/message-handler.inbound-contract.test.ts b/src/discord/monitor/message-handler.inbound-contract.test.ts index 1ffecb293..708c69993 100644 --- a/src/discord/monitor/message-handler.inbound-contract.test.ts +++ b/src/discord/monitor/message-handler.inbound-contract.test.ts @@ -9,17 +9,24 @@ import { expectInboundContextContract } from "../../../test/helpers/inbound-cont let capturedCtx: MsgContext | undefined; -vi.mock("../../auto-reply/reply/dispatch-from-config.js", () => ({ - dispatchReplyFromConfig: vi.fn(async (params: { ctx: MsgContext }) => { +vi.mock("../../auto-reply/dispatch.js", async (importOriginal) => { + const actual = await importOriginal(); + const dispatchInboundMessage = vi.fn(async (params: { ctx: MsgContext }) => { capturedCtx = params.ctx; - return { queuedFinal: false, counts: { tool: 0, block: 0 } }; - }), -})); + return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }; + }); + return { + ...actual, + dispatchInboundMessage, + dispatchInboundMessageWithDispatcher: dispatchInboundMessage, + dispatchInboundMessageWithBufferedDispatcher: dispatchInboundMessage, + }; +}); import { processDiscordMessage } from "./message-handler.process.js"; describe("discord processDiscordMessage inbound contract", () => { - it("passes a finalized MsgContext to dispatchReplyFromConfig", async () => { + it("passes a finalized MsgContext to dispatchInboundMessage", async () => { capturedCtx = undefined; const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-discord-")); diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 3a6b650cd..fe26c79d5 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -14,7 +14,7 @@ import { formatThreadStarterEnvelope, resolveEnvelopeFormatOptions, } from "../../auto-reply/envelope.js"; -import { dispatchReplyFromConfig } from "../../auto-reply/reply/dispatch-from-config.js"; +import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; import { buildPendingHistoryContextFromMap, clearHistoryEntries, @@ -358,7 +358,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) onReplyStart: () => sendTyping({ client, channelId: typingChannelId }), }); - const { queuedFinal, counts } = await dispatchReplyFromConfig({ + const { queuedFinal, counts } = await dispatchInboundMessage({ ctx: ctxPayload, cfg, dispatcher, diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 0e55b45f5..50f441779 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -2,30 +2,18 @@ import { randomUUID } from "node:crypto"; import fs from "node:fs"; import path from "node:path"; -import { resolveSessionAgentId, resolveAgentWorkspaceDir } from "../../agents/agent-scope.js"; +import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; +import { resolveSessionAgentId } from "../../agents/agent-scope.js"; +import { resolveEffectiveMessagesConfig, resolveIdentityName } from "../../agents/identity.js"; import { resolveThinkingDefault } from "../../agents/model-selection.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; -import { ensureAgentWorkspace } from "../../agents/workspace.js"; -import { isControlCommandMessage } from "../../auto-reply/command-detection.js"; -import { normalizeCommandBody } from "../../auto-reply/commands-registry.js"; -import { formatInboundEnvelope, resolveEnvelopeFormatOptions } from "../../auto-reply/envelope.js"; -import { buildCommandContext, handleCommands } from "../../auto-reply/reply/commands.js"; -import { parseInlineDirectives } from "../../auto-reply/reply/directive-handling.js"; -import { defaultGroupActivation } from "../../auto-reply/reply/groups.js"; -import { resolveContextTokens } from "../../auto-reply/reply/model-selection.js"; -import { resolveElevatedPermissions } from "../../auto-reply/reply/reply-elevated.js"; +import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; +import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; import { - normalizeElevatedLevel, - normalizeReasoningLevel, - normalizeThinkLevel, - normalizeVerboseLevel, -} from "../../auto-reply/thinking.js"; + extractShortModelName, + type ResponsePrefixContext, +} from "../../auto-reply/reply/response-prefix-template.js"; import type { MsgContext } from "../../auto-reply/templating.js"; -import { agentCommand } from "../../commands/agent.js"; -import { mergeSessionEntry, updateSessionStore } from "../../config/sessions.js"; -import { registerAgentRunContext } from "../../infra/agent-events.js"; -import { isAcpSessionKey } from "../../routing/session-key.js"; -import { defaultRuntime } from "../../runtime.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js"; import { @@ -53,7 +41,144 @@ import { } from "../session-utils.js"; import { stripEnvelopeFromMessages } from "../chat-sanitize.js"; import { formatForLog } from "../ws-log.js"; -import type { GatewayRequestHandlers } from "./types.js"; +import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js"; + +type TranscriptAppendResult = { + ok: boolean; + messageId?: string; + message?: Record; + error?: string; +}; + +function resolveTranscriptPath(params: { + sessionId: string; + storePath: string | undefined; + sessionFile?: string; +}): string | null { + const { sessionId, storePath, sessionFile } = params; + if (sessionFile) return sessionFile; + if (!storePath) return null; + return path.join(path.dirname(storePath), `${sessionId}.jsonl`); +} + +function ensureTranscriptFile(params: { transcriptPath: string; sessionId: string }): { + ok: boolean; + error?: string; +} { + if (fs.existsSync(params.transcriptPath)) return { ok: true }; + try { + fs.mkdirSync(path.dirname(params.transcriptPath), { recursive: true }); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: params.sessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }; + fs.writeFileSync(params.transcriptPath, `${JSON.stringify(header)}\n`, "utf-8"); + return { ok: true }; + } catch (err) { + return { ok: false, error: err instanceof Error ? err.message : String(err) }; + } +} + +function appendAssistantTranscriptMessage(params: { + message: string; + label?: string; + sessionId: string; + storePath: string | undefined; + sessionFile?: string; + createIfMissing?: boolean; +}): TranscriptAppendResult { + const transcriptPath = resolveTranscriptPath({ + sessionId: params.sessionId, + storePath: params.storePath, + sessionFile: params.sessionFile, + }); + if (!transcriptPath) { + return { ok: false, error: "transcript path not resolved" }; + } + + if (!fs.existsSync(transcriptPath)) { + if (!params.createIfMissing) { + return { ok: false, error: "transcript file not found" }; + } + const ensured = ensureTranscriptFile({ + transcriptPath, + sessionId: params.sessionId, + }); + if (!ensured.ok) { + return { ok: false, error: ensured.error ?? "failed to create transcript file" }; + } + } + + const now = Date.now(); + const messageId = randomUUID().slice(0, 8); + const labelPrefix = params.label ? `[${params.label}]\n\n` : ""; + const messageBody: Record = { + role: "assistant", + content: [{ type: "text", text: `${labelPrefix}${params.message}` }], + timestamp: now, + stopReason: "injected", + usage: { input: 0, output: 0, totalTokens: 0 }, + }; + const transcriptEntry = { + type: "message", + id: messageId, + timestamp: new Date(now).toISOString(), + message: messageBody, + }; + + try { + fs.appendFileSync(transcriptPath, `${JSON.stringify(transcriptEntry)}\n`, "utf-8"); + } catch (err) { + return { ok: false, error: err instanceof Error ? err.message : String(err) }; + } + + return { ok: true, messageId, message: transcriptEntry.message }; +} + +function nextChatSeq(context: { agentRunSeq: Map }, runId: string) { + const next = (context.agentRunSeq.get(runId) ?? 0) + 1; + context.agentRunSeq.set(runId, next); + return next; +} + +function broadcastChatFinal(params: { + context: Pick; + runId: string; + sessionKey: string; + message?: Record; +}) { + const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.runId); + const payload = { + runId: params.runId, + sessionKey: params.sessionKey, + seq, + state: "final" as const, + message: params.message, + }; + params.context.broadcast("chat", payload); + params.context.nodeSendToSession(params.sessionKey, "chat", payload); +} + +function broadcastChatError(params: { + context: Pick; + runId: string; + sessionKey: string; + errorMessage?: string; +}) { + const seq = nextChatSeq({ agentRunSeq: params.context.agentRunSeq }, params.runId); + const payload = { + runId: params.runId, + sessionKey: params.sessionKey, + seq, + state: "error" as const, + errorMessage: params.errorMessage, + }; + params.context.broadcast("chat", payload); + params.context.nodeSendToSession(params.sessionKey, "chat", payload); +} export const chatHandlers: GatewayRequestHandlers = { "chat.history": async ({ params, respond, context }) => { @@ -168,7 +293,7 @@ export const chatHandlers: GatewayRequestHandlers = { runIds: res.aborted ? [runId] : [], }); }, - "chat.send": async ({ params, respond, context }) => { + "chat.send": async ({ params, respond, context, client }) => { if (!validateChatSendParams(params)) { respond( false, @@ -228,20 +353,13 @@ export const chatHandlers: GatewayRequestHandlers = { return; } } - const { cfg, storePath, entry, canonicalKey, store } = loadSessionEntry(p.sessionKey); + const { cfg, entry } = loadSessionEntry(p.sessionKey); const timeoutMs = resolveAgentTimeoutMs({ cfg, overrideMs: p.timeoutMs, }); const now = Date.now(); - const sessionId = entry?.sessionId ?? randomUUID(); - const sessionEntry = mergeSessionEntry(entry, { - sessionId, - updatedAt: now, - }); - store[canonicalKey] = sessionEntry; const clientRunId = p.idempotencyKey; - registerAgentRunContext(clientRunId, { sessionKey: p.sessionKey }); const sendPolicy = resolveSendPolicy({ cfg, @@ -298,21 +416,11 @@ export const chatHandlers: GatewayRequestHandlers = { const abortController = new AbortController(); context.chatAbortControllers.set(clientRunId, { controller: abortController, - sessionId, + sessionId: entry?.sessionId ?? clientRunId, sessionKey: p.sessionKey, startedAtMs: now, expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }), }); - context.addChatRun(clientRunId, { - sessionKey: p.sessionKey, - clientRunId, - }); - - if (storePath) { - await updateSessionStore(storePath, (store) => { - store[canonicalKey] = sessionEntry; - }); - } const ackPayload = { runId: clientRunId, @@ -320,170 +428,116 @@ export const chatHandlers: GatewayRequestHandlers = { }; respond(true, ackPayload, undefined, { runId: clientRunId }); - if (isControlCommandMessage(parsedMessage, cfg)) { - try { - const isFastTestEnv = process.env.CLAWDBOT_TEST_FAST === "1"; - const agentId = resolveSessionAgentId({ sessionKey: p.sessionKey, config: cfg }); - const agentCfg = cfg.agents?.defaults; - const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId); - const workspace = await ensureAgentWorkspace({ - dir: workspaceDir, - ensureBootstrapFiles: !agentCfg?.skipBootstrap && !isFastTestEnv, - }); - const ctx: MsgContext = { - Body: parsedMessage, - CommandBody: parsedMessage, - BodyForCommands: parsedMessage, - CommandSource: "text", - CommandAuthorized: true, - Provider: INTERNAL_MESSAGE_CHANNEL, - Surface: "tui", - From: p.sessionKey, - To: INTERNAL_MESSAGE_CHANNEL, - SessionKey: p.sessionKey, - ChatType: "direct", - }; - const command = buildCommandContext({ - ctx, - cfg, - agentId, - sessionKey: p.sessionKey, - isGroup: false, - triggerBodyNormalized: normalizeCommandBody(parsedMessage), - commandAuthorized: true, - }); - const directives = parseInlineDirectives(parsedMessage); - const { provider, model } = resolveSessionModelRef(cfg, sessionEntry); - const contextTokens = resolveContextTokens({ agentCfg, model }); - const resolveDefaultThinkingLevel = async () => { - const configured = agentCfg?.thinkingDefault; - if (configured) return configured; - const catalog = await context.loadGatewayModelCatalog(); - return resolveThinkingDefault({ cfg, provider, model, catalog }); - }; - const resolvedThinkLevel = - normalizeThinkLevel(sessionEntry?.thinkingLevel ?? agentCfg?.thinkingDefault) ?? - (await resolveDefaultThinkingLevel()); - const resolvedVerboseLevel = - normalizeVerboseLevel(sessionEntry?.verboseLevel ?? agentCfg?.verboseDefault) ?? "off"; - const resolvedReasoningLevel = - normalizeReasoningLevel(sessionEntry?.reasoningLevel) ?? "off"; - const resolvedElevatedLevel = normalizeElevatedLevel( - sessionEntry?.elevatedLevel ?? agentCfg?.elevatedDefault, - ); - const elevated = resolveElevatedPermissions({ - cfg, - agentId, - ctx, - provider: INTERNAL_MESSAGE_CHANNEL, - }); - const commandResult = await handleCommands({ - ctx, - cfg, - command, - agentId, - directives, - elevated, - sessionEntry, - previousSessionEntry: entry, - sessionStore: store, - sessionKey: p.sessionKey, - storePath, - sessionScope: (cfg.session?.scope ?? "per-sender") as "per-sender" | "global", - workspaceDir: workspace.dir, - defaultGroupActivation: () => defaultGroupActivation(true), - resolvedThinkLevel, - resolvedVerboseLevel, - resolvedReasoningLevel, - resolvedElevatedLevel, - resolveDefaultThinkingLevel, - provider, - model, - contextTokens, - isGroup: false, - }); - if (!commandResult.shouldContinue) { - const text = commandResult.reply?.text ?? ""; - const message = { - role: "assistant", - content: text.trim() ? [{ type: "text", text }] : [], - timestamp: Date.now(), - command: true, - }; - const payload = { + const trimmedMessage = parsedMessage.trim(); + const injectThinking = Boolean( + p.thinking && trimmedMessage && !trimmedMessage.startsWith("/"), + ); + const commandBody = injectThinking ? `/think ${p.thinking} ${parsedMessage}` : parsedMessage; + const clientInfo = client?.connect?.client; + const ctx: MsgContext = { + Body: parsedMessage, + BodyForAgent: parsedMessage, + BodyForCommands: commandBody, + RawBody: parsedMessage, + CommandBody: commandBody, + SessionKey: p.sessionKey, + Provider: INTERNAL_MESSAGE_CHANNEL, + Surface: INTERNAL_MESSAGE_CHANNEL, + OriginatingChannel: INTERNAL_MESSAGE_CHANNEL, + ChatType: "direct", + CommandAuthorized: true, + MessageSid: clientRunId, + SenderId: clientInfo?.id, + SenderName: clientInfo?.displayName, + SenderUsername: clientInfo?.displayName, + }; + + const agentId = resolveSessionAgentId({ + sessionKey: p.sessionKey, + config: cfg, + }); + let prefixContext: ResponsePrefixContext = { + identityName: resolveIdentityName(cfg, agentId), + }; + const finalReplyParts: string[] = []; + const dispatcher = createReplyDispatcher({ + responsePrefix: resolveEffectiveMessagesConfig(cfg, agentId).responsePrefix, + responsePrefixContextProvider: () => prefixContext, + onError: (err) => { + context.logGateway.warn(`webchat dispatch failed: ${formatForLog(err)}`); + }, + deliver: async (payload, info) => { + if (info.kind !== "final") return; + const text = payload.text?.trim() ?? ""; + if (!text) return; + finalReplyParts.push(text); + }, + }); + + let agentRunStarted = false; + void dispatchInboundMessage({ + ctx, + cfg, + dispatcher, + replyOptions: { + runId: clientRunId, + abortSignal: abortController.signal, + images: parsedImages.length > 0 ? parsedImages : undefined, + disableBlockStreaming: true, + onAgentRunStart: () => { + agentRunStarted = true; + }, + onModelSelected: (ctx) => { + prefixContext.provider = ctx.provider; + prefixContext.model = extractShortModelName(ctx.model); + prefixContext.modelFull = `${ctx.provider}/${ctx.model}`; + prefixContext.thinkingLevel = ctx.thinkLevel ?? "off"; + }, + }, + }) + .then(() => { + if (!agentRunStarted) { + const combinedReply = finalReplyParts + .map((part) => part.trim()) + .filter(Boolean) + .join("\n\n") + .trim(); + let message: Record | undefined; + if (combinedReply) { + const { storePath: latestStorePath, entry: latestEntry } = loadSessionEntry( + p.sessionKey, + ); + const sessionId = latestEntry?.sessionId ?? entry?.sessionId ?? clientRunId; + const appended = appendAssistantTranscriptMessage({ + message: combinedReply, + sessionId, + storePath: latestStorePath, + sessionFile: latestEntry?.sessionFile, + createIfMissing: true, + }); + if (appended.ok) { + message = appended.message; + } else { + context.logGateway.warn( + `webchat transcript append failed: ${appended.error ?? "unknown error"}`, + ); + const now = Date.now(); + message = { + role: "assistant", + content: [{ type: "text", text: combinedReply }], + timestamp: now, + stopReason: "injected", + usage: { input: 0, output: 0, totalTokens: 0 }, + }; + } + } + broadcastChatFinal({ + context, runId: clientRunId, sessionKey: p.sessionKey, - seq: 0, - state: "final" as const, message, - }; - context.broadcast("chat", payload); - context.nodeSendToSession(p.sessionKey, "chat", payload); - context.dedupe.set(`chat:${clientRunId}`, { - ts: Date.now(), - ok: true, - payload: { runId: clientRunId, status: "ok" as const }, }); - context.chatAbortControllers.delete(clientRunId); - context.removeChatRun(clientRunId, clientRunId, p.sessionKey); - return; } - } catch (err) { - const payload = { - runId: clientRunId, - sessionKey: p.sessionKey, - seq: 0, - state: "error" as const, - errorMessage: formatForLog(err), - }; - const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); - context.broadcast("chat", payload); - context.nodeSendToSession(p.sessionKey, "chat", payload); - context.dedupe.set(`chat:${clientRunId}`, { - ts: Date.now(), - ok: false, - payload: { - runId: clientRunId, - status: "error" as const, - summary: String(err), - }, - error, - }); - context.chatAbortControllers.delete(clientRunId); - context.removeChatRun(clientRunId, clientRunId, p.sessionKey); - return; - } - } - - const envelopeOptions = resolveEnvelopeFormatOptions(cfg); - const envelopedMessage = formatInboundEnvelope({ - channel: "WebChat", - from: p.sessionKey, - timestamp: now, - body: parsedMessage, - chatType: "direct", - previousTimestamp: entry?.updatedAt, - envelope: envelopeOptions, - }); - const lane = isAcpSessionKey(p.sessionKey) ? p.sessionKey : undefined; - void agentCommand( - { - message: envelopedMessage, - images: parsedImages.length > 0 ? parsedImages : undefined, - sessionId, - sessionKey: p.sessionKey, - runId: clientRunId, - thinking: p.thinking, - deliver: p.deliver, - timeout: Math.ceil(timeoutMs / 1000).toString(), - messageChannel: INTERNAL_MESSAGE_CHANNEL, - abortSignal: abortController.signal, - lane, - }, - defaultRuntime, - context.deps, - ) - .then(() => { context.dedupe.set(`chat:${clientRunId}`, { ts: Date.now(), ok: true, @@ -502,6 +556,12 @@ export const chatHandlers: GatewayRequestHandlers = { }, error, }); + broadcastChatError({ + context, + runId: clientRunId, + sessionKey: p.sessionKey, + errorMessage: String(err), + }); }) .finally(() => { context.chatAbortControllers.delete(clientRunId); diff --git a/src/gateway/server.chat.gateway-server-chat-b.e2e.test.ts b/src/gateway/server.chat.gateway-server-chat-b.e2e.test.ts index 2b55b0c2e..e5c6c37aa 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.e2e.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.e2e.test.ts @@ -4,8 +4,8 @@ import path from "node:path"; import { describe, expect, test, vi } from "vitest"; import { emitAgentEvent } from "../infra/agent-events.js"; import { - agentCommand, connectOk, + getReplyFromConfig, installGatewayTestHooks, onceMessage, rpcReq, @@ -47,7 +47,7 @@ describe("gateway server chat", () => { async () => { const tempDirs: string[] = []; const { server, ws } = await startServerWithClient(); - const spy = vi.mocked(agentCommand); + const spy = vi.mocked(getReplyFromConfig); const resetSpy = () => { spy.mockReset(); spy.mockResolvedValue(undefined); @@ -122,8 +122,9 @@ describe("gateway server chat", () => { let abortInFlight: Promise | undefined; try { const callsBefore = spy.mock.calls.length; - spy.mockImplementationOnce(async (opts) => { - const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + spy.mockImplementationOnce(async (_ctx, opts) => { + opts?.onAgentRunStart?.(opts.runId ?? "idem-abort-1"); + const signal = opts?.abortSignal; await new Promise((resolve) => { if (!signal) return resolve(); if (signal.aborted) return resolve(); @@ -155,7 +156,7 @@ describe("gateway server chat", () => { const tick = () => { if (spy.mock.calls.length > callsBefore) return resolve(); if (Date.now() > deadline) - return reject(new Error("timeout waiting for agentCommand")); + return reject(new Error("timeout waiting for getReplyFromConfig")); setTimeout(tick, 5); }; tick(); @@ -177,8 +178,9 @@ describe("gateway server chat", () => { sessionStoreSaveDelayMs.value = 120; resetSpy(); try { - spy.mockImplementationOnce(async (opts) => { - const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + spy.mockImplementationOnce(async (_ctx, opts) => { + opts?.onAgentRunStart?.(opts.runId ?? "idem-abort-save-1"); + const signal = opts?.abortSignal; await new Promise((resolve) => { if (!signal) return resolve(); if (signal.aborted) return resolve(); @@ -215,8 +217,9 @@ describe("gateway server chat", () => { await writeStore({ main: { sessionId: "sess-main", updatedAt: Date.now() } }); resetSpy(); const callsBeforeStop = spy.mock.calls.length; - spy.mockImplementationOnce(async (opts) => { - const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + spy.mockImplementationOnce(async (_ctx, opts) => { + opts?.onAgentRunStart?.(opts.runId ?? "idem-stop-1"); + const signal = opts?.abortSignal; await new Promise((resolve) => { if (!signal) return resolve(); if (signal.aborted) return resolve(); @@ -261,7 +264,8 @@ describe("gateway server chat", () => { const runDone = new Promise((resolve) => { resolveRun = resolve; }); - spy.mockImplementationOnce(async () => { + spy.mockImplementationOnce(async (_ctx, opts) => { + opts?.onAgentRunStart?.(opts.runId ?? "idem-status-1"); await runDone; }); const started = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", { @@ -294,8 +298,9 @@ describe("gateway server chat", () => { } expect(completed).toBe(true); resetSpy(); - spy.mockImplementationOnce(async (opts) => { - const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + spy.mockImplementationOnce(async (_ctx, opts) => { + opts?.onAgentRunStart?.(opts.runId ?? "idem-abort-all-1"); + const signal = opts?.abortSignal; await new Promise((resolve) => { if (!signal) return resolve(); if (signal.aborted) return resolve(); @@ -359,9 +364,9 @@ describe("gateway server chat", () => { const agentStartedP = new Promise((resolve) => { agentStartedResolve = resolve; }); - spy.mockImplementationOnce(async (opts) => { + spy.mockImplementationOnce(async (_ctx, opts) => { agentStartedResolve?.(); - const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + const signal = opts?.abortSignal; await new Promise((resolve) => { if (!signal) return resolve(); if (signal.aborted) return resolve(); diff --git a/src/gateway/server.chat.gateway-server-chat.e2e.test.ts b/src/gateway/server.chat.gateway-server-chat.e2e.test.ts index d4035037b..54f772580 100644 --- a/src/gateway/server.chat.gateway-server-chat.e2e.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.e2e.test.ts @@ -6,8 +6,8 @@ import { WebSocket } from "ws"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js"; import { - agentCommand, connectOk, + getReplyFromConfig, installGatewayTestHooks, onceMessage, rpcReq, @@ -71,7 +71,7 @@ describe("gateway server chat", () => { webchatWs.close(); webchatWs = undefined; - const spy = vi.mocked(agentCommand); + const spy = vi.mocked(getReplyFromConfig); spy.mockClear(); testState.agentConfig = { timeoutSeconds: 123 }; const callsBeforeTimeout = spy.mock.calls.length; @@ -83,8 +83,8 @@ describe("gateway server chat", () => { expect(timeoutRes.ok).toBe(true); await waitFor(() => spy.mock.calls.length > callsBeforeTimeout); - const timeoutCall = spy.mock.calls.at(-1)?.[0] as { timeout?: string } | undefined; - expect(timeoutCall?.timeout).toBe("123"); + const timeoutCall = spy.mock.calls.at(-1)?.[1] as { runId?: string } | undefined; + expect(timeoutCall?.runId).toBe("idem-timeout-1"); testState.agentConfig = undefined; spy.mockClear(); @@ -97,8 +97,8 @@ describe("gateway server chat", () => { expect(sessionRes.ok).toBe(true); await waitFor(() => spy.mock.calls.length > callsBeforeSession); - const sessionCall = spy.mock.calls.at(-1)?.[0] as { sessionKey?: string } | undefined; - expect(sessionCall?.sessionKey).toBe("agent:main:subagent:abc"); + const sessionCall = spy.mock.calls.at(-1)?.[0] as { SessionKey?: string } | undefined; + expect(sessionCall?.SessionKey).toBe("agent:main:subagent:abc"); const sendPolicyDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-")); tempDirs.push(sendPolicyDir); @@ -203,10 +203,10 @@ describe("gateway server chat", () => { expect(imgRes.payload?.runId).toBeDefined(); await waitFor(() => spy.mock.calls.length > callsBeforeImage, 8000); - const imgCall = spy.mock.calls.at(-1)?.[0] as + const imgOpts = spy.mock.calls.at(-1)?.[1] as | { images?: Array<{ type: string; data: string; mimeType: string }> } | undefined; - expect(imgCall?.images).toEqual([{ type: "image", data: pngB64, mimeType: "image/png" }]); + expect(imgOpts?.images).toEqual([{ type: "image", data: pngB64, mimeType: "image/png" }]); const historyDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-")); tempDirs.push(historyDir); diff --git a/src/gateway/test-helpers.mocks.ts b/src/gateway/test-helpers.mocks.ts index 46631ba09..5268bd459 100644 --- a/src/gateway/test-helpers.mocks.ts +++ b/src/gateway/test-helpers.mocks.ts @@ -166,6 +166,7 @@ const hoisted = vi.hoisted(() => ({ waitCalls: [] as string[], waitResults: new Map(), }, + getReplyFromConfig: vi.fn().mockResolvedValue(undefined), sendWhatsAppMock: vi.fn().mockResolvedValue({ messageId: "msg-1", toJid: "jid-1" }), })); @@ -197,6 +198,7 @@ export const testTailnetIPv4 = hoisted.testTailnetIPv4; export const piSdkMock = hoisted.piSdkMock; export const cronIsolatedRun = hoisted.cronIsolatedRun; export const agentCommand = hoisted.agentCommand; +export const getReplyFromConfig = hoisted.getReplyFromConfig; export const testState = { agentConfig: undefined as Record | undefined, @@ -540,6 +542,9 @@ vi.mock("../channels/web/index.js", async () => { vi.mock("../commands/agent.js", () => ({ agentCommand, })); +vi.mock("../auto-reply/reply.js", () => ({ + getReplyFromConfig, +})); vi.mock("../cli/deps.js", async () => { const actual = await vi.importActual("../cli/deps.js"); const base = actual.createDefaultDeps(); diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index 65185087b..13e106eb9 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -20,7 +20,7 @@ import { createInboundDebouncer, resolveInboundDebounceMs, } from "../../auto-reply/inbound-debounce.js"; -import { dispatchReplyFromConfig } from "../../auto-reply/reply/dispatch-from-config.js"; +import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import { buildPendingHistoryContextFromMap, @@ -565,7 +565,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }, }); - const { queuedFinal } = await dispatchReplyFromConfig({ + const { queuedFinal } = await dispatchInboundMessage({ ctx: ctxPayload, cfg, dispatcher, diff --git a/src/signal/monitor.event-handler.sender-prefix.test.ts b/src/signal/monitor.event-handler.sender-prefix.test.ts index 3c5569940..bf8ee7136 100644 --- a/src/signal/monitor.event-handler.sender-prefix.test.ts +++ b/src/signal/monitor.event-handler.sender-prefix.test.ts @@ -12,21 +12,21 @@ describe("signal event handler sender prefix", () => { beforeEach(() => { dispatchMock.mockReset().mockImplementation(async ({ dispatcher, ctx }) => { dispatcher.sendFinalReply({ text: "ok" }); - return { queuedFinal: true, counts: { final: 1 }, ctx }; + return { queuedFinal: true, counts: { tool: 0, block: 0, final: 1 }, ctx }; }); readAllowFromMock.mockReset().mockResolvedValue([]); }); it("prefixes group bodies with sender label", async () => { let capturedBody = ""; - const dispatchModule = await import("../auto-reply/reply/dispatch-from-config.js"); - vi.spyOn(dispatchModule, "dispatchReplyFromConfig").mockImplementation( + const dispatchModule = await import("../auto-reply/dispatch.js"); + vi.spyOn(dispatchModule, "dispatchInboundMessage").mockImplementation( async (...args: unknown[]) => dispatchMock(...args), ); dispatchMock.mockImplementationOnce(async ({ dispatcher, ctx }) => { capturedBody = ctx.Body ?? ""; dispatcher.sendFinalReply({ text: "ok" }); - return { queuedFinal: true, counts: { final: 1 } }; + return { queuedFinal: true, counts: { tool: 0, block: 0, final: 1 } }; }); const { createSignalEventHandler } = await import("./monitor/event-handler.js"); diff --git a/src/signal/monitor.event-handler.typing-read-receipts.test.ts b/src/signal/monitor.event-handler.typing-read-receipts.test.ts index c4ba6f9ce..7aee1e24d 100644 --- a/src/signal/monitor.event-handler.typing-read-receipts.test.ts +++ b/src/signal/monitor.event-handler.typing-read-receipts.test.ts @@ -9,14 +9,21 @@ vi.mock("./send.js", () => ({ sendReadReceiptSignal: (...args: unknown[]) => sendReadReceiptMock(...args), })); -vi.mock("../auto-reply/reply/dispatch-from-config.js", () => ({ - dispatchReplyFromConfig: vi.fn( +vi.mock("../auto-reply/dispatch.js", async (importOriginal) => { + const actual = await importOriginal(); + const dispatchInboundMessage = vi.fn( async (params: { replyOptions?: { onReplyStart?: () => void } }) => { await Promise.resolve(params.replyOptions?.onReplyStart?.()); return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }; }, - ), -})); + ); + return { + ...actual, + dispatchInboundMessage, + dispatchInboundMessageWithDispatcher: dispatchInboundMessage, + dispatchInboundMessageWithBufferedDispatcher: dispatchInboundMessage, + }; +}); vi.mock("../pairing/pairing-store.js", () => ({ readChannelAllowFromStore: vi.fn().mockResolvedValue([]), @@ -25,11 +32,13 @@ vi.mock("../pairing/pairing-store.js", () => ({ describe("signal event handler typing + read receipts", () => { beforeEach(() => { + vi.useRealTimers(); sendTypingMock.mockReset().mockResolvedValue(true); sendReadReceiptMock.mockReset().mockResolvedValue(true); }); it("sends typing + read receipt for allowed DMs", async () => { + vi.resetModules(); const { createSignalEventHandler } = await import("./monitor/event-handler.js"); const handler = createSignalEventHandler({ runtime: { log: () => {}, error: () => {} } as any, diff --git a/src/signal/monitor/event-handler.inbound-contract.test.ts b/src/signal/monitor/event-handler.inbound-contract.test.ts index 9277eb990..d073357ff 100644 --- a/src/signal/monitor/event-handler.inbound-contract.test.ts +++ b/src/signal/monitor/event-handler.inbound-contract.test.ts @@ -5,17 +5,24 @@ import { expectInboundContextContract } from "../../../test/helpers/inbound-cont let capturedCtx: MsgContext | undefined; -vi.mock("../../auto-reply/reply/dispatch-from-config.js", () => ({ - dispatchReplyFromConfig: vi.fn(async (params: { ctx: MsgContext }) => { +vi.mock("../../auto-reply/dispatch.js", async (importOriginal) => { + const actual = await importOriginal(); + const dispatchInboundMessage = vi.fn(async (params: { ctx: MsgContext }) => { capturedCtx = params.ctx; - return { queuedFinal: false, counts: { tool: 0, block: 0 } }; - }), -})); + return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }; + }); + return { + ...actual, + dispatchInboundMessage, + dispatchInboundMessageWithDispatcher: dispatchInboundMessage, + dispatchInboundMessageWithBufferedDispatcher: dispatchInboundMessage, + }; +}); import { createSignalEventHandler } from "./event-handler.js"; describe("signal createSignalEventHandler inbound contract", () => { - it("passes a finalized MsgContext to dispatchReplyFromConfig", async () => { + it("passes a finalized MsgContext to dispatchInboundMessage", async () => { capturedCtx = undefined; const handler = createSignalEventHandler({ diff --git a/src/signal/monitor/event-handler.ts b/src/signal/monitor/event-handler.ts index 30fabedfb..dfa3fe7ab 100644 --- a/src/signal/monitor/event-handler.ts +++ b/src/signal/monitor/event-handler.ts @@ -17,7 +17,7 @@ import { createInboundDebouncer, resolveInboundDebounceMs, } from "../../auto-reply/inbound-debounce.js"; -import { dispatchReplyFromConfig } from "../../auto-reply/reply/dispatch-from-config.js"; +import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; import { buildPendingHistoryContextFromMap, clearHistoryEntries, @@ -225,7 +225,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { onReplyStart, }); - const { queuedFinal } = await dispatchReplyFromConfig({ + const { queuedFinal } = await dispatchInboundMessage({ ctx: ctxPayload, cfg: deps.cfg, dispatcher, diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index de1b1f267..22cd9fcfe 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -7,7 +7,7 @@ import { extractShortModelName, type ResponsePrefixContext, } from "../../../auto-reply/reply/response-prefix-template.js"; -import { dispatchReplyFromConfig } from "../../../auto-reply/reply/dispatch-from-config.js"; +import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js"; import { clearHistoryEntries } from "../../../auto-reply/reply/history.js"; import { createReplyDispatcherWithTyping } from "../../../auto-reply/reply/reply-dispatcher.js"; import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js"; @@ -104,7 +104,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag onReplyStart, }); - const { queuedFinal, counts } = await dispatchReplyFromConfig({ + const { queuedFinal, counts } = await dispatchInboundMessage({ ctx: prepared.ctxPayload, cfg, dispatcher, diff --git a/src/telegram/bot.create-telegram-bot.accepts-group-messages-mentionpatterns-match-without-botusername.test.ts b/src/telegram/bot.create-telegram-bot.accepts-group-messages-mentionpatterns-match-without-botusername.test.ts index 7024a2e52..1a6afa519 100644 --- a/src/telegram/bot.create-telegram-bot.accepts-group-messages-mentionpatterns-match-without-botusername.test.ts +++ b/src/telegram/bot.create-telegram-bot.accepts-group-messages-mentionpatterns-match-without-botusername.test.ts @@ -4,6 +4,10 @@ import { escapeRegExp, formatEnvelopeTimestamp } from "../../test/helpers/envelo let createTelegramBot: typeof import("./bot.js").createTelegramBot; let resetInboundDedupe: typeof import("../auto-reply/reply/inbound-dedupe.js").resetInboundDedupe; +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-${Math.random().toString(16).slice(2)}.json`, +})); + const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), })); @@ -23,6 +27,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({ diff --git a/src/telegram/bot.create-telegram-bot.applies-topic-skill-filters-system-prompts.test.ts b/src/telegram/bot.create-telegram-bot.applies-topic-skill-filters-system-prompts.test.ts index 1a10ca94c..0cda853be 100644 --- a/src/telegram/bot.create-telegram-bot.applies-topic-skill-filters-system-prompts.test.ts +++ b/src/telegram/bot.create-telegram-bot.applies-topic-skill-filters-system-prompts.test.ts @@ -3,6 +3,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let createTelegramBot: typeof import("./bot.js").createTelegramBot; let resetInboundDedupe: typeof import("../auto-reply/reply/inbound-dedupe.js").resetInboundDedupe; +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-${Math.random().toString(16).slice(2)}.json`, +})); + const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), })); @@ -22,6 +26,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({ diff --git a/src/telegram/bot.create-telegram-bot.blocks-all-group-messages-grouppolicy-is.test.ts b/src/telegram/bot.create-telegram-bot.blocks-all-group-messages-grouppolicy-is.test.ts index 7937c1064..0aa431d1b 100644 --- a/src/telegram/bot.create-telegram-bot.blocks-all-group-messages-grouppolicy-is.test.ts +++ b/src/telegram/bot.create-telegram-bot.blocks-all-group-messages-grouppolicy-is.test.ts @@ -3,6 +3,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let createTelegramBot: typeof import("./bot.js").createTelegramBot; let resetInboundDedupe: typeof import("../auto-reply/reply/inbound-dedupe.js").resetInboundDedupe; +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-${Math.random().toString(16).slice(2)}.json`, +})); + const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), })); @@ -22,6 +26,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({ diff --git a/src/telegram/bot.create-telegram-bot.dedupes-duplicate-callback-query-updates-by-update.test.ts b/src/telegram/bot.create-telegram-bot.dedupes-duplicate-callback-query-updates-by-update.test.ts index 5e8a2dcfa..8ed8e189f 100644 --- a/src/telegram/bot.create-telegram-bot.dedupes-duplicate-callback-query-updates-by-update.test.ts +++ b/src/telegram/bot.create-telegram-bot.dedupes-duplicate-callback-query-updates-by-update.test.ts @@ -3,6 +3,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let createTelegramBot: typeof import("./bot.js").createTelegramBot; let resetInboundDedupe: typeof import("../auto-reply/reply/inbound-dedupe.js").resetInboundDedupe; +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-${Math.random().toString(16).slice(2)}.json`, +})); + const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), })); @@ -22,6 +26,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({ diff --git a/src/telegram/bot.create-telegram-bot.installs-grammy-throttler.test.ts b/src/telegram/bot.create-telegram-bot.installs-grammy-throttler.test.ts index 05aac6388..ebbd3b092 100644 --- a/src/telegram/bot.create-telegram-bot.installs-grammy-throttler.test.ts +++ b/src/telegram/bot.create-telegram-bot.installs-grammy-throttler.test.ts @@ -6,6 +6,9 @@ let createTelegramBot: typeof import("./bot.js").createTelegramBot; let getTelegramSequentialKey: typeof import("./bot.js").getTelegramSequentialKey; let resetInboundDedupe: typeof import("../auto-reply/reply/inbound-dedupe.js").resetInboundDedupe; +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-throttler-${Math.random().toString(16).slice(2)}.json`, +})); const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), })); @@ -25,6 +28,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({ diff --git a/src/telegram/bot.create-telegram-bot.matches-tg-prefixed-allowfrom-entries-case-insensitively.test.ts b/src/telegram/bot.create-telegram-bot.matches-tg-prefixed-allowfrom-entries-case-insensitively.test.ts index 2c4dfa472..805aa34da 100644 --- a/src/telegram/bot.create-telegram-bot.matches-tg-prefixed-allowfrom-entries-case-insensitively.test.ts +++ b/src/telegram/bot.create-telegram-bot.matches-tg-prefixed-allowfrom-entries-case-insensitively.test.ts @@ -3,6 +3,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let createTelegramBot: typeof import("./bot.js").createTelegramBot; let resetInboundDedupe: typeof import("../auto-reply/reply/inbound-dedupe.js").resetInboundDedupe; +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-${Math.random().toString(16).slice(2)}.json`, +})); + const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), })); @@ -22,6 +26,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({ diff --git a/src/telegram/bot.create-telegram-bot.matches-usernames-case-insensitively-grouppolicy-is.test.ts b/src/telegram/bot.create-telegram-bot.matches-usernames-case-insensitively-grouppolicy-is.test.ts index 2281fb407..ec81283bb 100644 --- a/src/telegram/bot.create-telegram-bot.matches-usernames-case-insensitively-grouppolicy-is.test.ts +++ b/src/telegram/bot.create-telegram-bot.matches-usernames-case-insensitively-grouppolicy-is.test.ts @@ -3,6 +3,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let createTelegramBot: typeof import("./bot.js").createTelegramBot; let resetInboundDedupe: typeof import("../auto-reply/reply/inbound-dedupe.js").resetInboundDedupe; +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-${Math.random().toString(16).slice(2)}.json`, +})); + const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), })); @@ -22,6 +26,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({ diff --git a/src/telegram/bot.create-telegram-bot.routes-dms-by-telegram-accountid-binding.test.ts b/src/telegram/bot.create-telegram-bot.routes-dms-by-telegram-accountid-binding.test.ts index 829391727..fd9401dac 100644 --- a/src/telegram/bot.create-telegram-bot.routes-dms-by-telegram-accountid-binding.test.ts +++ b/src/telegram/bot.create-telegram-bot.routes-dms-by-telegram-accountid-binding.test.ts @@ -3,6 +3,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let createTelegramBot: typeof import("./bot.js").createTelegramBot; let resetInboundDedupe: typeof import("../auto-reply/reply/inbound-dedupe.js").resetInboundDedupe; +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-${Math.random().toString(16).slice(2)}.json`, +})); + const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), })); @@ -22,6 +26,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({ diff --git a/src/telegram/bot.create-telegram-bot.sends-replies-without-native-reply-threading.test.ts b/src/telegram/bot.create-telegram-bot.sends-replies-without-native-reply-threading.test.ts index 164095a9c..80c880b79 100644 --- a/src/telegram/bot.create-telegram-bot.sends-replies-without-native-reply-threading.test.ts +++ b/src/telegram/bot.create-telegram-bot.sends-replies-without-native-reply-threading.test.ts @@ -6,6 +6,12 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; let createTelegramBot: typeof import("./bot.js").createTelegramBot; let resetInboundDedupe: typeof import("../auto-reply/reply/inbound-dedupe.js").resetInboundDedupe; +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-reply-threading-${Math.random() + .toString(16) + .slice(2)}.json`, +})); + const { loadWebMedia } = vi.hoisted(() => ({ loadWebMedia: vi.fn(), })); @@ -25,6 +31,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({ diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index da67c2e38..d4cdfaf4b 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -21,6 +21,10 @@ vi.mock("../auto-reply/skill-commands.js", () => ({ listSkillCommandsForAgents, })); +const { sessionStorePath } = vi.hoisted(() => ({ + sessionStorePath: `/tmp/clawdbot-telegram-bot-${Math.random().toString(16).slice(2)}.json`, +})); + function resolveSkillCommands(config: Parameters[0]) { return listSkillCommandsForAgents({ cfg: config }); } @@ -44,6 +48,14 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +vi.mock("../config/sessions.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + resolveStorePath: vi.fn((storePath) => storePath ?? sessionStorePath), + }; +}); + const { readTelegramAllowFromStore, upsertTelegramPairingRequest } = vi.hoisted(() => ({ readTelegramAllowFromStore: vi.fn(async () => [] as string[]), upsertTelegramPairingRequest: vi.fn(async () => ({