diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ff1f21e8..5e9a2a1dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Agents: add human-delay pacing between block replies (modes: off/natural/custom, per-agent configurable). (#446) — thanks @tony-freedomology. ### Fixes +- Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj. - Agents/OpenAI: fix Responses tool-only → follow-up turn handling (avoid standalone `reasoning` items that trigger 400 “required following item”). - Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage. - Onboarding/Gateway: persist non-interactive gateway token auth in config; add WS wizard + gateway tool-calling regression coverage. diff --git a/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift b/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift index 031465d6d..898833d02 100644 --- a/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift +++ b/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift @@ -1629,11 +1629,11 @@ public struct ChatSendParams: Codable, Sendable { public struct ChatAbortParams: Codable, Sendable { public let sessionkey: String - public let runid: String + public let runid: String? public init( sessionkey: String, - runid: String + runid: String? ) { self.sessionkey = sessionkey self.runid = runid diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index 274b3fbb6..80efd2c70 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -2098,6 +2098,7 @@ Template placeholders are expanded in `audio.transcription.command` (and any fut | Variable | Description | |----------|-------------| | `{{Body}}` | Full inbound message body | +| `{{RawBody}}` | Raw inbound message body (no history/sender wrappers; best for command parsing) | | `{{BodyStripped}}` | Body with group mentions stripped (best default for agents) | | `{{From}}` | Sender identifier (E.164 for WhatsApp; may differ per provider) | | `{{To}}` | Destination identifier | diff --git a/src/auto-reply/reply.raw-body.test.ts b/src/auto-reply/reply.raw-body.test.ts new file mode 100644 index 000000000..640006abb --- /dev/null +++ b/src/auto-reply/reply.raw-body.test.ts @@ -0,0 +1,154 @@ +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js"; +import { loadModelCatalog } from "../agents/model-catalog.js"; +import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { getReplyFromConfig } from "./reply.js"; + +vi.mock("../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(false), + runEmbeddedPiAgent: vi.fn(), + queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + resolveEmbeddedSessionLane: (key: string) => + `session:${key.trim() || "main"}`, + isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), + isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), +})); +vi.mock("../agents/model-catalog.js", () => ({ + loadModelCatalog: vi.fn(), +})); + +async function withTempHome(fn: (home: string) => Promise): Promise { + return withTempHomeBase( + async (home) => { + return await fn(home); + }, + { + env: { + CLAWDBOT_AGENT_DIR: (home) => path.join(home, ".clawdbot", "agent"), + PI_CODING_AGENT_DIR: (home) => path.join(home, ".clawdbot", "agent"), + }, + prefix: "clawdbot-rawbody-", + }, + ); +} + +describe("RawBody directive parsing", () => { + beforeEach(() => { + vi.mocked(runEmbeddedPiAgent).mockReset(); + vi.mocked(loadModelCatalog).mockResolvedValue([ + { id: "claude-opus-4-5", name: "Opus 4.5", provider: "anthropic" }, + ]); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("/model, /think, /verbose directives detected from RawBody even when Body has structural wrapper", async () => { + await withTempHome(async (home) => { + vi.mocked(runEmbeddedPiAgent).mockReset(); + + const groupMessageCtx = { + Body: `[Chat messages since your last reply - for context]\\n[WhatsApp ...] Someone: hello\\n\\n[Current message - respond to this]\\n[WhatsApp ...] Jake: /think:high\\n[from: Jake McInteer (+6421807830)]`, + RawBody: "/think:high", + From: "+1222", + To: "+1222", + ChatType: "group", + }; + + const res = await getReplyFromConfig( + groupMessageCtx, + {}, + { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + const text = Array.isArray(res) ? res[0]?.text : res?.text; + expect(text).toContain("Thinking level set to high."); + expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + }); + }); + + it("/model status detected from RawBody", async () => { + await withTempHome(async (home) => { + vi.mocked(runEmbeddedPiAgent).mockReset(); + + const groupMessageCtx = { + Body: `[Context]\nJake: /model status\n[from: Jake]`, + RawBody: "/model status", + From: "+1222", + To: "+1222", + ChatType: "group", + }; + + const res = await getReplyFromConfig( + groupMessageCtx, + {}, + { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + models: { + "anthropic/claude-opus-4-5": {}, + }, + }, + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + const text = Array.isArray(res) ? res[0]?.text : res?.text; + expect(text).toContain("anthropic/claude-opus-4-5"); + expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + }); + }); + + it("Integration: WhatsApp group message with structural wrapper and RawBody command", async () => { + await withTempHome(async (home) => { + vi.mocked(runEmbeddedPiAgent).mockReset(); + + const groupMessageCtx = { + Body: `[Chat messages since your last reply - for context]\\n[WhatsApp ...] Someone: hello\\n\\n[Current message - respond to this]\\n[WhatsApp ...] Jake: /status\\n[from: Jake McInteer (+6421807830)]`, + RawBody: "/status", + ChatType: "group", + From: "+1222", + To: "+1222", + SessionKey: "agent:main:whatsapp:group:G1", + Provider: "whatsapp", + Surface: "whatsapp", + SenderE164: "+1222", + }; + + const res = await getReplyFromConfig( + groupMessageCtx, + {}, + { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + }, + whatsapp: { allowFrom: ["+1222"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + const text = Array.isArray(res) ? res[0]?.text : res?.text; + expect(text).toContain("Session: agent:main:whatsapp:group:G1"); + expect(text).toContain("anthropic/claude-opus-4-5"); + expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index c6dbfac00..6ffe877f5 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -60,7 +60,11 @@ import { defaultGroupActivation, resolveGroupRequireMention, } from "./reply/groups.js"; -import { stripMentions, stripStructuralPrefixes } from "./reply/mentions.js"; +import { + CURRENT_MESSAGE_MARKER, + stripMentions, + stripStructuralPrefixes, +} from "./reply/mentions.js"; import { createModelSelectionState, resolveContextTokens, @@ -336,7 +340,10 @@ export async function getReplyFromConfig( triggerBodyNormalized, } = sessionState; - const rawBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; + // Prefer RawBody (clean message without structural context) for directive parsing. + // Keep `Body`/`BodyStripped` as the best-available prompt text (may include context). + const rawBody = + sessionCtx.RawBody ?? sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; const clearInlineDirectives = (cleaned: string): InlineDirectives => ({ cleaned, hasThinkDirective: false, @@ -426,8 +433,37 @@ export async function getReplyFromConfig( hasQueueDirective: false, queueReset: false, }; - sessionCtx.Body = parsedDirectives.cleaned; - sessionCtx.BodyStripped = parsedDirectives.cleaned; + const existingBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; + const cleanedBody = (() => { + if (!existingBody) return parsedDirectives.cleaned; + if (!sessionCtx.RawBody) { + return parseInlineDirectives(existingBody, { + modelAliases: configuredAliases, + }).cleaned; + } + + const markerIndex = existingBody.indexOf(CURRENT_MESSAGE_MARKER); + if (markerIndex < 0) { + return parseInlineDirectives(existingBody, { + modelAliases: configuredAliases, + }).cleaned; + } + + const head = existingBody.slice( + 0, + markerIndex + CURRENT_MESSAGE_MARKER.length, + ); + const tail = existingBody.slice( + markerIndex + CURRENT_MESSAGE_MARKER.length, + ); + const cleanedTail = parseInlineDirectives(tail, { + modelAliases: configuredAliases, + }).cleaned; + return `${head}${cleanedTail}`; + })(); + + sessionCtx.Body = cleanedBody; + sessionCtx.BodyStripped = cleanedBody; const messageProviderKey = sessionCtx.Provider?.trim().toLowerCase() ?? @@ -750,7 +786,8 @@ export async function getReplyFromConfig( .filter(Boolean) .join("\n\n"); const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; - const rawBodyTrimmed = (ctx.Body ?? "").trim(); + // Use RawBody for bare reset detection (clean message without structural context). + const rawBodyTrimmed = (ctx.RawBody ?? ctx.Body ?? "").trim(); const baseBodyTrimmedRaw = baseBody.trim(); if ( allowTextCommands && diff --git a/src/auto-reply/reply/abort.test.ts b/src/auto-reply/reply/abort.test.ts new file mode 100644 index 000000000..3d5bae140 --- /dev/null +++ b/src/auto-reply/reply/abort.test.ts @@ -0,0 +1,42 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import type { ClawdbotConfig } from "../../config/config.js"; +import { isAbortTrigger } from "./abort.js"; +import { initSessionState } from "./session.js"; + +describe("abort detection", () => { + it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-")); + const storePath = path.join(root, "sessions.json"); + const cfg = { session: { store: storePath } } as ClawdbotConfig; + + const groupMessageCtx = { + Body: `[Context]\nJake: /stop\n[from: Jake]`, + RawBody: "/stop", + ChatType: "group", + SessionKey: "agent:main:whatsapp:group:G1", + }; + + const result = await initSessionState({ + ctx: groupMessageCtx, + cfg, + commandAuthorized: true, + }); + + // /stop is detected via exact match in handleAbort, not isAbortTrigger + expect(result.triggerBodyNormalized).toBe("/stop"); + }); + + it("isAbortTrigger matches bare word triggers (without slash)", () => { + expect(isAbortTrigger("stop")).toBe(true); + expect(isAbortTrigger("esc")).toBe(true); + expect(isAbortTrigger("abort")).toBe(true); + expect(isAbortTrigger("wait")).toBe(true); + expect(isAbortTrigger("exit")).toBe(true); + expect(isAbortTrigger("hello")).toBe(false); + // /stop is NOT matched by isAbortTrigger - it's handled separately + expect(isAbortTrigger("/stop")).toBe(false); + }); +}); diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index eb31ae0f7..a33dc4b7d 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -81,7 +81,8 @@ export async function tryFastAbortFromMessage(params: { sessionKey: targetKey ?? ctx.SessionKey ?? "", config: cfg, }); - const raw = stripStructuralPrefixes(ctx.Body ?? ""); + // Use RawBody for abort detection (clean message without structural context). + const raw = stripStructuralPrefixes(ctx.RawBody ?? ctx.Body ?? ""); const isGroup = ctx.ChatType?.trim().toLowerCase() === "group"; const stripped = isGroup ? stripMentions(raw, ctx, cfg, agentId) : raw; const normalized = normalizeCommandBody(stripped); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 169d85651..09d8772c2 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -15,6 +15,7 @@ import { resolveSessionTranscriptPath, type SessionEntry, saveSessionStore, + updateSessionStoreEntry, } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; @@ -824,46 +825,48 @@ export async function runReplyAgent(params: { sessionEntry?.contextTokens ?? DEFAULT_CONTEXT_TOKENS; - if (sessionStore && sessionKey) { + if (storePath && sessionKey) { if (hasNonzeroUsage(usage)) { - const entry = sessionEntry ?? sessionStore[sessionKey]; - if (entry) { - const input = usage.input ?? 0; - const output = usage.output ?? 0; - const promptTokens = - input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); - const nextEntry = { - ...entry, - inputTokens: input, - outputTokens: output, - totalTokens: - promptTokens > 0 ? promptTokens : (usage.total ?? input), - modelProvider: providerUsed, - model: modelUsed, - contextTokens: contextTokensUsed ?? entry.contextTokens, - updatedAt: Date.now(), - }; - if (cliSessionId) { - nextEntry.claudeCliSessionId = cliSessionId; - } - sessionStore[sessionKey] = nextEntry; - if (storePath) { - await saveSessionStore(storePath, sessionStore); - } + try { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async (entry) => { + const input = usage.input ?? 0; + const output = usage.output ?? 0; + const promptTokens = + input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); + return { + inputTokens: input, + outputTokens: output, + totalTokens: + promptTokens > 0 ? promptTokens : (usage.total ?? input), + modelProvider: providerUsed, + model: modelUsed, + contextTokens: contextTokensUsed ?? entry.contextTokens, + updatedAt: Date.now(), + claudeCliSessionId: cliSessionId ?? entry.claudeCliSessionId, + }; + }, + }); + } catch (err) { + logVerbose(`failed to persist usage update: ${String(err)}`); } } else if (modelUsed || contextTokensUsed) { - const entry = sessionEntry ?? sessionStore[sessionKey]; - if (entry) { - sessionStore[sessionKey] = { - ...entry, - modelProvider: providerUsed ?? entry.modelProvider, - model: modelUsed ?? entry.model, - contextTokens: contextTokensUsed ?? entry.contextTokens, - claudeCliSessionId: cliSessionId ?? entry.claudeCliSessionId, - }; - if (storePath) { - await saveSessionStore(storePath, sessionStore); - } + try { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async (entry) => ({ + modelProvider: providerUsed ?? entry.modelProvider, + model: modelUsed ?? entry.model, + contextTokens: contextTokensUsed ?? entry.contextTokens, + claudeCliSessionId: cliSessionId ?? entry.claudeCliSessionId, + updatedAt: Date.now(), + }), + }); + } catch (err) { + logVerbose(`failed to persist model/context update: ${String(err)}`); } } } diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 324494bc9..99c3455bc 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -4,7 +4,10 @@ import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { runWithModelFallback } from "../../agents/model-fallback.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import { hasNonzeroUsage } from "../../agents/usage.js"; -import { type SessionEntry, saveSessionStore } from "../../config/sessions.js"; +import { + type SessionEntry, + updateSessionStoreEntry, +} from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { logVerbose } from "../../globals.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; @@ -232,7 +235,7 @@ export function createFollowupRunner(params: { } } - if (sessionStore && sessionKey) { + if (storePath && sessionKey) { const usage = runResult.meta.agentMeta?.usage; const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; @@ -243,39 +246,48 @@ export function createFollowupRunner(params: { DEFAULT_CONTEXT_TOKENS; if (hasNonzeroUsage(usage)) { - const entry = sessionStore[sessionKey]; - if (entry) { - const input = usage.input ?? 0; - const output = usage.output ?? 0; - const promptTokens = - input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); - sessionStore[sessionKey] = { - ...entry, - inputTokens: input, - outputTokens: output, - totalTokens: - promptTokens > 0 ? promptTokens : (usage.total ?? input), - modelProvider: fallbackProvider ?? entry.modelProvider, - model: modelUsed, - contextTokens: contextTokensUsed ?? entry.contextTokens, - updatedAt: Date.now(), - }; - if (storePath) { - await saveSessionStore(storePath, sessionStore); - } + try { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async (entry) => { + const input = usage.input ?? 0; + const output = usage.output ?? 0; + const promptTokens = + input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); + return { + inputTokens: input, + outputTokens: output, + totalTokens: + promptTokens > 0 ? promptTokens : (usage.total ?? input), + modelProvider: fallbackProvider ?? entry.modelProvider, + model: modelUsed, + contextTokens: contextTokensUsed ?? entry.contextTokens, + updatedAt: Date.now(), + }; + }, + }); + } catch (err) { + logVerbose( + `failed to persist followup usage update: ${String(err)}`, + ); } } else if (modelUsed || contextTokensUsed) { - const entry = sessionStore[sessionKey]; - if (entry) { - sessionStore[sessionKey] = { - ...entry, - modelProvider: fallbackProvider ?? entry.modelProvider, - model: modelUsed ?? entry.model, - contextTokens: contextTokensUsed ?? entry.contextTokens, - }; - if (storePath) { - await saveSessionStore(storePath, sessionStore); - } + try { + await updateSessionStoreEntry({ + storePath, + sessionKey, + update: async (entry) => ({ + modelProvider: fallbackProvider ?? entry.modelProvider, + model: modelUsed ?? entry.model, + contextTokens: contextTokensUsed ?? entry.contextTokens, + updatedAt: Date.now(), + }), + }); + } catch (err) { + logVerbose( + `failed to persist followup model/context update: ${String(err)}`, + ); } } } diff --git a/src/auto-reply/reply/mentions.ts b/src/auto-reply/reply/mentions.ts index 1ef890f3b..63366001e 100644 --- a/src/auto-reply/reply/mentions.ts +++ b/src/auto-reply/reply/mentions.ts @@ -23,6 +23,8 @@ function deriveMentionPatterns(identity?: { name?: string; emoji?: string }) { const BACKSPACE_CHAR = "\u0008"; +export const CURRENT_MESSAGE_MARKER = "[Current message - respond to this]"; + function normalizeMentionPattern(pattern: string): string { if (!pattern.includes(BACKSPACE_CHAR)) return pattern; return pattern.split(BACKSPACE_CHAR).join("\\b"); @@ -87,13 +89,18 @@ export function matchesMentionPatterns( export function stripStructuralPrefixes(text: string): string { // Ignore wrapper labels, timestamps, and sender prefixes so directive-only // detection still works in group batches that include history/context. - const marker = "[Current message - respond to this]"; - const afterMarker = text.includes(marker) - ? text.slice(text.indexOf(marker) + marker.length) + const afterMarker = text.includes(CURRENT_MESSAGE_MARKER) + ? text + .slice( + text.indexOf(CURRENT_MESSAGE_MARKER) + CURRENT_MESSAGE_MARKER.length, + ) + .trimStart() : text; + return afterMarker .replace(/\[[^\]]+\]\s*/g, "") .replace(/^[ \t]*[A-Za-z0-9+()\-_. ]+:\s*/gm, "") + .replace(/\\n/g, " ") .replace(/\s+/g, " ") .trim(); } @@ -105,9 +112,9 @@ export function stripMentions( agentId?: string, ): string { let result = text; - const patterns = normalizeMentionPatterns( - resolveMentionPatterns(cfg, agentId), - ); + const rawPatterns = resolveMentionPatterns(cfg, agentId); + const patterns = normalizeMentionPatterns(rawPatterns); + for (const p of patterns) { try { const re = new RegExp(p, "gi"); diff --git a/src/auto-reply/reply/session.test.ts b/src/auto-reply/reply/session.test.ts index abcfe6996..6fe748e20 100644 --- a/src/auto-reply/reply/session.test.ts +++ b/src/auto-reply/reply/session.test.ts @@ -110,3 +110,71 @@ describe("initSessionState thread forking", () => { ); }); }); + +describe("initSessionState RawBody", () => { + it("triggerBodyNormalized correctly extracts commands when Body contains context but RawBody is clean", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-rawbody-")); + const storePath = path.join(root, "sessions.json"); + const cfg = { session: { store: storePath } } as ClawdbotConfig; + + const groupMessageCtx = { + Body: `[Chat messages since your last reply - for context]\n[WhatsApp ...] Someone: hello\n\n[Current message - respond to this]\n[WhatsApp ...] Jake: /status\n[from: Jake McInteer (+6421807830)]`, + RawBody: "/status", + ChatType: "group", + SessionKey: "agent:main:whatsapp:group:G1", + }; + + const result = await initSessionState({ + ctx: groupMessageCtx, + cfg, + commandAuthorized: true, + }); + + expect(result.triggerBodyNormalized).toBe("/status"); + }); + + it("Reset triggers (/new, /reset) work with RawBody", async () => { + const root = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdbot-rawbody-reset-"), + ); + const storePath = path.join(root, "sessions.json"); + const cfg = { session: { store: storePath } } as ClawdbotConfig; + + const groupMessageCtx = { + Body: `[Context]\nJake: /new\n[from: Jake]`, + RawBody: "/new", + ChatType: "group", + SessionKey: "agent:main:whatsapp:group:G1", + }; + + const result = await initSessionState({ + ctx: groupMessageCtx, + cfg, + commandAuthorized: true, + }); + + expect(result.isNewSession).toBe(true); + expect(result.bodyStripped).toBe(""); + }); + + it("falls back to Body when RawBody is undefined", async () => { + const root = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdbot-rawbody-fallback-"), + ); + const storePath = path.join(root, "sessions.json"); + const cfg = { session: { store: storePath } } as ClawdbotConfig; + + const ctx = { + Body: "/status", + SessionKey: "agent:main:whatsapp:dm:S1", + }; + + const result = await initSessionState({ + ctx, + cfg, + commandAuthorized: true, + }); + + expect(result.triggerBodyNormalized).toBe("/status"); + }); +}); diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index 8bc0e2d40..fb769a6c3 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -136,11 +136,15 @@ export async function initSessionState(params: { resolveGroupSessionKey(sessionCtxForState) ?? undefined; const isGroup = ctx.ChatType?.trim().toLowerCase() === "group" || Boolean(groupResolution); - const triggerBodyNormalized = stripStructuralPrefixes(ctx.Body ?? "") + // Prefer RawBody (clean message) for command detection; fall back to Body + // which may contain structural context (history, sender labels). + const commandSource = ctx.RawBody ?? ctx.Body ?? ""; + const triggerBodyNormalized = stripStructuralPrefixes(commandSource) .trim() .toLowerCase(); - const rawBody = ctx.Body ?? ""; + // Use RawBody for reset trigger matching (clean message without structural context). + const rawBody = ctx.RawBody ?? ctx.Body ?? ""; const trimmedBody = rawBody.trim(); const resetAuthorized = resolveCommandAuthorization({ ctx, @@ -284,7 +288,9 @@ export async function initSessionState(params: { const sessionCtx: TemplateContext = { ...ctx, - BodyStripped: bodyStripped ?? ctx.Body, + // Keep BodyStripped aligned with Body (best default for agent prompts). + // RawBody is reserved for command/directive parsing and may omit context. + BodyStripped: bodyStripped ?? ctx.Body ?? ctx.RawBody, SessionId: sessionId, IsNewSession: isNewSession ? "true" : "false", }; diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index 3e1212e0e..4c5a7cf91 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -11,6 +11,11 @@ export type OriginatingChannelType = export type MsgContext = { Body?: string; + /** + * Raw message body without structural context (history, sender labels). + * Used for command detection. Falls back to Body if not set. + */ + RawBody?: string; From?: string; To?: string; SessionKey?: string; diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index f13b3649f..4863c3dd7 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -11,6 +11,7 @@ import { resolveSessionTranscriptPath, resolveSessionTranscriptsDir, updateLastRoute, + updateSessionStoreEntry, } from "./sessions.js"; describe("sessions", () => { @@ -187,4 +188,52 @@ describe("sessions", () => { } } }); + + it("updateSessionStoreEntry merges concurrent patches", async () => { + const mainSessionKey = "agent:main:main"; + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-")); + const storePath = path.join(dir, "sessions.json"); + await fs.writeFile( + storePath, + JSON.stringify( + { + [mainSessionKey]: { + sessionId: "sess-1", + updatedAt: 123, + thinkingLevel: "low", + }, + }, + null, + 2, + ), + "utf-8", + ); + + const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + await Promise.all([ + updateSessionStoreEntry({ + storePath, + sessionKey: mainSessionKey, + update: async () => { + await sleep(50); + return { modelOverride: "anthropic/claude-opus-4-5" }; + }, + }), + updateSessionStoreEntry({ + storePath, + sessionKey: mainSessionKey, + update: async () => { + await sleep(10); + return { thinkingLevel: "high" }; + }, + }), + ]); + + const store = loadSessionStore(storePath); + expect(store[mainSessionKey]?.modelOverride).toBe( + "anthropic/claude-opus-4-5", + ); + expect(store[mainSessionKey]?.thinkingLevel).toBe("high"); + await expect(fs.stat(`${storePath}.lock`)).rejects.toThrow(); + }); }); diff --git a/src/config/sessions.ts b/src/config/sessions.ts index d94e10dd1..ae3970e88 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -139,11 +139,14 @@ export function mergeSessionEntry( ): SessionEntry { const sessionId = patch.sessionId ?? existing?.sessionId ?? crypto.randomUUID(); - const updatedAt = patch.updatedAt ?? existing?.updatedAt ?? Date.now(); + const updatedAt = Math.max( + existing?.updatedAt ?? 0, + patch.updatedAt ?? 0, + Date.now(), + ); if (!existing) return { ...patch, sessionId, updatedAt }; return { ...existing, ...patch, sessionId, updatedAt }; } - export type GroupKeyResolution = { key: string; legacyKey?: string; @@ -487,6 +490,92 @@ export async function saveSessionStore( } } +type SessionStoreLockOptions = { + timeoutMs?: number; + pollIntervalMs?: number; + staleMs?: number; +}; + +async function withSessionStoreLock( + storePath: string, + fn: () => Promise, + opts: SessionStoreLockOptions = {}, +): Promise { + const timeoutMs = opts.timeoutMs ?? 10_000; + const pollIntervalMs = opts.pollIntervalMs ?? 25; + const staleMs = opts.staleMs ?? 30_000; + const lockPath = `${storePath}.lock`; + const startedAt = Date.now(); + + await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); + + while (true) { + try { + const handle = await fs.promises.open(lockPath, "wx"); + try { + await handle.writeFile( + JSON.stringify({ pid: process.pid, startedAt: Date.now() }), + "utf-8", + ); + } catch { + // best-effort + } + await handle.close(); + break; + } catch (err) { + const code = + err && typeof err === "object" && "code" in err + ? String((err as { code?: unknown }).code) + : null; + if (code !== "EEXIST") throw err; + + const now = Date.now(); + if (now - startedAt > timeoutMs) { + throw new Error(`timeout acquiring session store lock: ${lockPath}`); + } + + // Best-effort stale lock eviction (e.g. crashed process). + try { + const st = await fs.promises.stat(lockPath); + const ageMs = now - st.mtimeMs; + if (ageMs > staleMs) { + await fs.promises.unlink(lockPath); + continue; + } + } catch { + // ignore + } + + await new Promise((r) => setTimeout(r, pollIntervalMs)); + } + } + + try { + return await fn(); + } finally { + await fs.promises.unlink(lockPath).catch(() => undefined); + } +} + +export async function updateSessionStoreEntry(params: { + storePath: string; + sessionKey: string; + update: (entry: SessionEntry) => Promise | null>; +}): Promise { + const { storePath, sessionKey, update } = params; + return await withSessionStoreLock(storePath, async () => { + const store = loadSessionStore(storePath); + const existing = store[sessionKey]; + if (!existing) return null; + const patch = await update(existing); + if (!patch) return existing; + const next = mergeSessionEntry(existing, patch); + store[sessionKey] = next; + await saveSessionStore(storePath, store); + return next; + }); +} + export async function updateLastRoute(params: { storePath: string; sessionKey: string; diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index dc418c71f..9a99dd54f 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -1089,6 +1089,7 @@ export function createDiscordMessageHandler(params: { }); const ctxPayload = { Body: combinedBody, + RawBody: baseText, From: isDirectMessage ? `discord:${author.id}` : `group:${message.channelId}`, diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 3873a067a..47c8d8e0e 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -1221,6 +1221,7 @@ export async function monitorWebProvider( const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ ctx: { Body: combinedBody, + RawBody: msg.body, From: msg.from, To: msg.to, SessionKey: route.sessionKey,