fix(auto-reply): RawBody commands + locked session updates (#643)

This commit is contained in:
Peter Steinberger
2026-01-10 17:32:19 +01:00
parent e2ea02160d
commit e3cd431551
17 changed files with 566 additions and 89 deletions

View File

@@ -7,6 +7,7 @@
- Agents: add human-delay pacing between block replies (modes: off/natural/custom, per-agent configurable). (#446) — thanks @tony-freedomology.
### Fixes
- Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj.
- Agents/OpenAI: fix Responses tool-only → follow-up turn handling (avoid standalone `reasoning` items that trigger 400 “required following item”).
- Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage.
- Onboarding/Gateway: persist non-interactive gateway token auth in config; add WS wizard + gateway tool-calling regression coverage.

View File

@@ -1629,11 +1629,11 @@ public struct ChatSendParams: Codable, Sendable {
public struct ChatAbortParams: Codable, Sendable {
public let sessionkey: String
public let runid: String
public let runid: String?
public init(
sessionkey: String,
runid: String
runid: String?
) {
self.sessionkey = sessionkey
self.runid = runid

View File

@@ -2098,6 +2098,7 @@ Template placeholders are expanded in `audio.transcription.command` (and any fut
| Variable | Description |
|----------|-------------|
| `{{Body}}` | Full inbound message body |
| `{{RawBody}}` | Raw inbound message body (no history/sender wrappers; best for command parsing) |
| `{{BodyStripped}}` | Body with group mentions stripped (best default for agents) |
| `{{From}}` | Sender identifier (E.164 for WhatsApp; may differ per provider) |
| `{{To}}` | Destination identifier |

View File

@@ -0,0 +1,154 @@
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js";
import { loadModelCatalog } from "../agents/model-catalog.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { getReplyFromConfig } from "./reply.js";
vi.mock("../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
runEmbeddedPiAgent: vi.fn(),
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
resolveEmbeddedSessionLane: (key: string) =>
`session:${key.trim() || "main"}`,
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
}));
vi.mock("../agents/model-catalog.js", () => ({
loadModelCatalog: vi.fn(),
}));
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
return withTempHomeBase(
async (home) => {
return await fn(home);
},
{
env: {
CLAWDBOT_AGENT_DIR: (home) => path.join(home, ".clawdbot", "agent"),
PI_CODING_AGENT_DIR: (home) => path.join(home, ".clawdbot", "agent"),
},
prefix: "clawdbot-rawbody-",
},
);
}
describe("RawBody directive parsing", () => {
beforeEach(() => {
vi.mocked(runEmbeddedPiAgent).mockReset();
vi.mocked(loadModelCatalog).mockResolvedValue([
{ id: "claude-opus-4-5", name: "Opus 4.5", provider: "anthropic" },
]);
});
afterEach(() => {
vi.restoreAllMocks();
});
it("/model, /think, /verbose directives detected from RawBody even when Body has structural wrapper", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockReset();
const groupMessageCtx = {
Body: `[Chat messages since your last reply - for context]\\n[WhatsApp ...] Someone: hello\\n\\n[Current message - respond to this]\\n[WhatsApp ...] Jake: /think:high\\n[from: Jake McInteer (+6421807830)]`,
RawBody: "/think:high",
From: "+1222",
To: "+1222",
ChatType: "group",
};
const res = await getReplyFromConfig(
groupMessageCtx,
{},
{
agents: {
defaults: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
},
whatsapp: { allowFrom: ["*"] },
session: { store: path.join(home, "sessions.json") },
},
);
const text = Array.isArray(res) ? res[0]?.text : res?.text;
expect(text).toContain("Thinking level set to high.");
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
});
});
it("/model status detected from RawBody", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockReset();
const groupMessageCtx = {
Body: `[Context]\nJake: /model status\n[from: Jake]`,
RawBody: "/model status",
From: "+1222",
To: "+1222",
ChatType: "group",
};
const res = await getReplyFromConfig(
groupMessageCtx,
{},
{
agents: {
defaults: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
models: {
"anthropic/claude-opus-4-5": {},
},
},
},
whatsapp: { allowFrom: ["*"] },
session: { store: path.join(home, "sessions.json") },
},
);
const text = Array.isArray(res) ? res[0]?.text : res?.text;
expect(text).toContain("anthropic/claude-opus-4-5");
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
});
});
it("Integration: WhatsApp group message with structural wrapper and RawBody command", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockReset();
const groupMessageCtx = {
Body: `[Chat messages since your last reply - for context]\\n[WhatsApp ...] Someone: hello\\n\\n[Current message - respond to this]\\n[WhatsApp ...] Jake: /status\\n[from: Jake McInteer (+6421807830)]`,
RawBody: "/status",
ChatType: "group",
From: "+1222",
To: "+1222",
SessionKey: "agent:main:whatsapp:group:G1",
Provider: "whatsapp",
Surface: "whatsapp",
SenderE164: "+1222",
};
const res = await getReplyFromConfig(
groupMessageCtx,
{},
{
agents: {
defaults: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
},
whatsapp: { allowFrom: ["+1222"] },
session: { store: path.join(home, "sessions.json") },
},
);
const text = Array.isArray(res) ? res[0]?.text : res?.text;
expect(text).toContain("Session: agent:main:whatsapp:group:G1");
expect(text).toContain("anthropic/claude-opus-4-5");
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
});
});
});

View File

@@ -60,7 +60,11 @@ import {
defaultGroupActivation,
resolveGroupRequireMention,
} from "./reply/groups.js";
import { stripMentions, stripStructuralPrefixes } from "./reply/mentions.js";
import {
CURRENT_MESSAGE_MARKER,
stripMentions,
stripStructuralPrefixes,
} from "./reply/mentions.js";
import {
createModelSelectionState,
resolveContextTokens,
@@ -336,7 +340,10 @@ export async function getReplyFromConfig(
triggerBodyNormalized,
} = sessionState;
const rawBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? "";
// Prefer RawBody (clean message without structural context) for directive parsing.
// Keep `Body`/`BodyStripped` as the best-available prompt text (may include context).
const rawBody =
sessionCtx.RawBody ?? sessionCtx.BodyStripped ?? sessionCtx.Body ?? "";
const clearInlineDirectives = (cleaned: string): InlineDirectives => ({
cleaned,
hasThinkDirective: false,
@@ -426,8 +433,37 @@ export async function getReplyFromConfig(
hasQueueDirective: false,
queueReset: false,
};
sessionCtx.Body = parsedDirectives.cleaned;
sessionCtx.BodyStripped = parsedDirectives.cleaned;
const existingBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? "";
const cleanedBody = (() => {
if (!existingBody) return parsedDirectives.cleaned;
if (!sessionCtx.RawBody) {
return parseInlineDirectives(existingBody, {
modelAliases: configuredAliases,
}).cleaned;
}
const markerIndex = existingBody.indexOf(CURRENT_MESSAGE_MARKER);
if (markerIndex < 0) {
return parseInlineDirectives(existingBody, {
modelAliases: configuredAliases,
}).cleaned;
}
const head = existingBody.slice(
0,
markerIndex + CURRENT_MESSAGE_MARKER.length,
);
const tail = existingBody.slice(
markerIndex + CURRENT_MESSAGE_MARKER.length,
);
const cleanedTail = parseInlineDirectives(tail, {
modelAliases: configuredAliases,
}).cleaned;
return `${head}${cleanedTail}`;
})();
sessionCtx.Body = cleanedBody;
sessionCtx.BodyStripped = cleanedBody;
const messageProviderKey =
sessionCtx.Provider?.trim().toLowerCase() ??
@@ -750,7 +786,8 @@ export async function getReplyFromConfig(
.filter(Boolean)
.join("\n\n");
const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? "";
const rawBodyTrimmed = (ctx.Body ?? "").trim();
// Use RawBody for bare reset detection (clean message without structural context).
const rawBodyTrimmed = (ctx.RawBody ?? ctx.Body ?? "").trim();
const baseBodyTrimmedRaw = baseBody.trim();
if (
allowTextCommands &&

View File

@@ -0,0 +1,42 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import type { ClawdbotConfig } from "../../config/config.js";
import { isAbortTrigger } from "./abort.js";
import { initSessionState } from "./session.js";
describe("abort detection", () => {
it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-"));
const storePath = path.join(root, "sessions.json");
const cfg = { session: { store: storePath } } as ClawdbotConfig;
const groupMessageCtx = {
Body: `[Context]\nJake: /stop\n[from: Jake]`,
RawBody: "/stop",
ChatType: "group",
SessionKey: "agent:main:whatsapp:group:G1",
};
const result = await initSessionState({
ctx: groupMessageCtx,
cfg,
commandAuthorized: true,
});
// /stop is detected via exact match in handleAbort, not isAbortTrigger
expect(result.triggerBodyNormalized).toBe("/stop");
});
it("isAbortTrigger matches bare word triggers (without slash)", () => {
expect(isAbortTrigger("stop")).toBe(true);
expect(isAbortTrigger("esc")).toBe(true);
expect(isAbortTrigger("abort")).toBe(true);
expect(isAbortTrigger("wait")).toBe(true);
expect(isAbortTrigger("exit")).toBe(true);
expect(isAbortTrigger("hello")).toBe(false);
// /stop is NOT matched by isAbortTrigger - it's handled separately
expect(isAbortTrigger("/stop")).toBe(false);
});
});

View File

@@ -81,7 +81,8 @@ export async function tryFastAbortFromMessage(params: {
sessionKey: targetKey ?? ctx.SessionKey ?? "",
config: cfg,
});
const raw = stripStructuralPrefixes(ctx.Body ?? "");
// Use RawBody for abort detection (clean message without structural context).
const raw = stripStructuralPrefixes(ctx.RawBody ?? ctx.Body ?? "");
const isGroup = ctx.ChatType?.trim().toLowerCase() === "group";
const stripped = isGroup ? stripMentions(raw, ctx, cfg, agentId) : raw;
const normalized = normalizeCommandBody(stripped);

View File

@@ -15,6 +15,7 @@ import {
resolveSessionTranscriptPath,
type SessionEntry,
saveSessionStore,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
@@ -824,46 +825,48 @@ export async function runReplyAgent(params: {
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
if (sessionStore && sessionKey) {
if (storePath && sessionKey) {
if (hasNonzeroUsage(usage)) {
const entry = sessionEntry ?? sessionStore[sessionKey];
if (entry) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
const nextEntry = {
...entry,
inputTokens: input,
outputTokens: output,
totalTokens:
promptTokens > 0 ? promptTokens : (usage.total ?? input),
modelProvider: providerUsed,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
};
if (cliSessionId) {
nextEntry.claudeCliSessionId = cliSessionId;
}
sessionStore[sessionKey] = nextEntry;
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
return {
inputTokens: input,
outputTokens: output,
totalTokens:
promptTokens > 0 ? promptTokens : (usage.total ?? input),
modelProvider: providerUsed,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
claudeCliSessionId: cliSessionId ?? entry.claudeCliSessionId,
};
},
});
} catch (err) {
logVerbose(`failed to persist usage update: ${String(err)}`);
}
} else if (modelUsed || contextTokensUsed) {
const entry = sessionEntry ?? sessionStore[sessionKey];
if (entry) {
sessionStore[sessionKey] = {
...entry,
modelProvider: providerUsed ?? entry.modelProvider,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
claudeCliSessionId: cliSessionId ?? entry.claudeCliSessionId,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => ({
modelProvider: providerUsed ?? entry.modelProvider,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
claudeCliSessionId: cliSessionId ?? entry.claudeCliSessionId,
updatedAt: Date.now(),
}),
});
} catch (err) {
logVerbose(`failed to persist model/context update: ${String(err)}`);
}
}
}

View File

@@ -4,7 +4,10 @@ import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { runWithModelFallback } from "../../agents/model-fallback.js";
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import {
type SessionEntry,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
@@ -232,7 +235,7 @@ export function createFollowupRunner(params: {
}
}
if (sessionStore && sessionKey) {
if (storePath && sessionKey) {
const usage = runResult.meta.agentMeta?.usage;
const modelUsed =
runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
@@ -243,39 +246,48 @@ export function createFollowupRunner(params: {
DEFAULT_CONTEXT_TOKENS;
if (hasNonzeroUsage(usage)) {
const entry = sessionStore[sessionKey];
if (entry) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
sessionStore[sessionKey] = {
...entry,
inputTokens: input,
outputTokens: output,
totalTokens:
promptTokens > 0 ? promptTokens : (usage.total ?? input),
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
return {
inputTokens: input,
outputTokens: output,
totalTokens:
promptTokens > 0 ? promptTokens : (usage.total ?? input),
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
};
},
});
} catch (err) {
logVerbose(
`failed to persist followup usage update: ${String(err)}`,
);
}
} else if (modelUsed || contextTokensUsed) {
const entry = sessionStore[sessionKey];
if (entry) {
sessionStore[sessionKey] = {
...entry,
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async (entry) => ({
modelProvider: fallbackProvider ?? entry.modelProvider,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
}),
});
} catch (err) {
logVerbose(
`failed to persist followup model/context update: ${String(err)}`,
);
}
}
}

View File

@@ -23,6 +23,8 @@ function deriveMentionPatterns(identity?: { name?: string; emoji?: string }) {
const BACKSPACE_CHAR = "\u0008";
export const CURRENT_MESSAGE_MARKER = "[Current message - respond to this]";
function normalizeMentionPattern(pattern: string): string {
if (!pattern.includes(BACKSPACE_CHAR)) return pattern;
return pattern.split(BACKSPACE_CHAR).join("\\b");
@@ -87,13 +89,18 @@ export function matchesMentionPatterns(
export function stripStructuralPrefixes(text: string): string {
// Ignore wrapper labels, timestamps, and sender prefixes so directive-only
// detection still works in group batches that include history/context.
const marker = "[Current message - respond to this]";
const afterMarker = text.includes(marker)
? text.slice(text.indexOf(marker) + marker.length)
const afterMarker = text.includes(CURRENT_MESSAGE_MARKER)
? text
.slice(
text.indexOf(CURRENT_MESSAGE_MARKER) + CURRENT_MESSAGE_MARKER.length,
)
.trimStart()
: text;
return afterMarker
.replace(/\[[^\]]+\]\s*/g, "")
.replace(/^[ \t]*[A-Za-z0-9+()\-_. ]+:\s*/gm, "")
.replace(/\\n/g, " ")
.replace(/\s+/g, " ")
.trim();
}
@@ -105,9 +112,9 @@ export function stripMentions(
agentId?: string,
): string {
let result = text;
const patterns = normalizeMentionPatterns(
resolveMentionPatterns(cfg, agentId),
);
const rawPatterns = resolveMentionPatterns(cfg, agentId);
const patterns = normalizeMentionPatterns(rawPatterns);
for (const p of patterns) {
try {
const re = new RegExp(p, "gi");

View File

@@ -110,3 +110,71 @@ describe("initSessionState thread forking", () => {
);
});
});
describe("initSessionState RawBody", () => {
it("triggerBodyNormalized correctly extracts commands when Body contains context but RawBody is clean", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-rawbody-"));
const storePath = path.join(root, "sessions.json");
const cfg = { session: { store: storePath } } as ClawdbotConfig;
const groupMessageCtx = {
Body: `[Chat messages since your last reply - for context]\n[WhatsApp ...] Someone: hello\n\n[Current message - respond to this]\n[WhatsApp ...] Jake: /status\n[from: Jake McInteer (+6421807830)]`,
RawBody: "/status",
ChatType: "group",
SessionKey: "agent:main:whatsapp:group:G1",
};
const result = await initSessionState({
ctx: groupMessageCtx,
cfg,
commandAuthorized: true,
});
expect(result.triggerBodyNormalized).toBe("/status");
});
it("Reset triggers (/new, /reset) work with RawBody", async () => {
const root = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdbot-rawbody-reset-"),
);
const storePath = path.join(root, "sessions.json");
const cfg = { session: { store: storePath } } as ClawdbotConfig;
const groupMessageCtx = {
Body: `[Context]\nJake: /new\n[from: Jake]`,
RawBody: "/new",
ChatType: "group",
SessionKey: "agent:main:whatsapp:group:G1",
};
const result = await initSessionState({
ctx: groupMessageCtx,
cfg,
commandAuthorized: true,
});
expect(result.isNewSession).toBe(true);
expect(result.bodyStripped).toBe("");
});
it("falls back to Body when RawBody is undefined", async () => {
const root = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdbot-rawbody-fallback-"),
);
const storePath = path.join(root, "sessions.json");
const cfg = { session: { store: storePath } } as ClawdbotConfig;
const ctx = {
Body: "/status",
SessionKey: "agent:main:whatsapp:dm:S1",
};
const result = await initSessionState({
ctx,
cfg,
commandAuthorized: true,
});
expect(result.triggerBodyNormalized).toBe("/status");
});
});

View File

@@ -136,11 +136,15 @@ export async function initSessionState(params: {
resolveGroupSessionKey(sessionCtxForState) ?? undefined;
const isGroup =
ctx.ChatType?.trim().toLowerCase() === "group" || Boolean(groupResolution);
const triggerBodyNormalized = stripStructuralPrefixes(ctx.Body ?? "")
// Prefer RawBody (clean message) for command detection; fall back to Body
// which may contain structural context (history, sender labels).
const commandSource = ctx.RawBody ?? ctx.Body ?? "";
const triggerBodyNormalized = stripStructuralPrefixes(commandSource)
.trim()
.toLowerCase();
const rawBody = ctx.Body ?? "";
// Use RawBody for reset trigger matching (clean message without structural context).
const rawBody = ctx.RawBody ?? ctx.Body ?? "";
const trimmedBody = rawBody.trim();
const resetAuthorized = resolveCommandAuthorization({
ctx,
@@ -284,7 +288,9 @@ export async function initSessionState(params: {
const sessionCtx: TemplateContext = {
...ctx,
BodyStripped: bodyStripped ?? ctx.Body,
// Keep BodyStripped aligned with Body (best default for agent prompts).
// RawBody is reserved for command/directive parsing and may omit context.
BodyStripped: bodyStripped ?? ctx.Body ?? ctx.RawBody,
SessionId: sessionId,
IsNewSession: isNewSession ? "true" : "false",
};

View File

@@ -11,6 +11,11 @@ export type OriginatingChannelType =
export type MsgContext = {
Body?: string;
/**
* Raw message body without structural context (history, sender labels).
* Used for command detection. Falls back to Body if not set.
*/
RawBody?: string;
From?: string;
To?: string;
SessionKey?: string;

View File

@@ -11,6 +11,7 @@ import {
resolveSessionTranscriptPath,
resolveSessionTranscriptsDir,
updateLastRoute,
updateSessionStoreEntry,
} from "./sessions.js";
describe("sessions", () => {
@@ -187,4 +188,52 @@ describe("sessions", () => {
}
}
});
it("updateSessionStoreEntry merges concurrent patches", async () => {
const mainSessionKey = "agent:main:main";
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(
storePath,
JSON.stringify(
{
[mainSessionKey]: {
sessionId: "sess-1",
updatedAt: 123,
thinkingLevel: "low",
},
},
null,
2,
),
"utf-8",
);
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
await Promise.all([
updateSessionStoreEntry({
storePath,
sessionKey: mainSessionKey,
update: async () => {
await sleep(50);
return { modelOverride: "anthropic/claude-opus-4-5" };
},
}),
updateSessionStoreEntry({
storePath,
sessionKey: mainSessionKey,
update: async () => {
await sleep(10);
return { thinkingLevel: "high" };
},
}),
]);
const store = loadSessionStore(storePath);
expect(store[mainSessionKey]?.modelOverride).toBe(
"anthropic/claude-opus-4-5",
);
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
await expect(fs.stat(`${storePath}.lock`)).rejects.toThrow();
});
});

View File

@@ -139,11 +139,14 @@ export function mergeSessionEntry(
): SessionEntry {
const sessionId =
patch.sessionId ?? existing?.sessionId ?? crypto.randomUUID();
const updatedAt = patch.updatedAt ?? existing?.updatedAt ?? Date.now();
const updatedAt = Math.max(
existing?.updatedAt ?? 0,
patch.updatedAt ?? 0,
Date.now(),
);
if (!existing) return { ...patch, sessionId, updatedAt };
return { ...existing, ...patch, sessionId, updatedAt };
}
export type GroupKeyResolution = {
key: string;
legacyKey?: string;
@@ -487,6 +490,92 @@ export async function saveSessionStore(
}
}
type SessionStoreLockOptions = {
timeoutMs?: number;
pollIntervalMs?: number;
staleMs?: number;
};
async function withSessionStoreLock<T>(
storePath: string,
fn: () => Promise<T>,
opts: SessionStoreLockOptions = {},
): Promise<T> {
const timeoutMs = opts.timeoutMs ?? 10_000;
const pollIntervalMs = opts.pollIntervalMs ?? 25;
const staleMs = opts.staleMs ?? 30_000;
const lockPath = `${storePath}.lock`;
const startedAt = Date.now();
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
while (true) {
try {
const handle = await fs.promises.open(lockPath, "wx");
try {
await handle.writeFile(
JSON.stringify({ pid: process.pid, startedAt: Date.now() }),
"utf-8",
);
} catch {
// best-effort
}
await handle.close();
break;
} catch (err) {
const code =
err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
if (code !== "EEXIST") throw err;
const now = Date.now();
if (now - startedAt > timeoutMs) {
throw new Error(`timeout acquiring session store lock: ${lockPath}`);
}
// Best-effort stale lock eviction (e.g. crashed process).
try {
const st = await fs.promises.stat(lockPath);
const ageMs = now - st.mtimeMs;
if (ageMs > staleMs) {
await fs.promises.unlink(lockPath);
continue;
}
} catch {
// ignore
}
await new Promise((r) => setTimeout(r, pollIntervalMs));
}
}
try {
return await fn();
} finally {
await fs.promises.unlink(lockPath).catch(() => undefined);
}
}
export async function updateSessionStoreEntry(params: {
storePath: string;
sessionKey: string;
update: (entry: SessionEntry) => Promise<Partial<SessionEntry> | null>;
}): Promise<SessionEntry | null> {
const { storePath, sessionKey, update } = params;
return await withSessionStoreLock(storePath, async () => {
const store = loadSessionStore(storePath);
const existing = store[sessionKey];
if (!existing) return null;
const patch = await update(existing);
if (!patch) return existing;
const next = mergeSessionEntry(existing, patch);
store[sessionKey] = next;
await saveSessionStore(storePath, store);
return next;
});
}
export async function updateLastRoute(params: {
storePath: string;
sessionKey: string;

View File

@@ -1089,6 +1089,7 @@ export function createDiscordMessageHandler(params: {
});
const ctxPayload = {
Body: combinedBody,
RawBody: baseText,
From: isDirectMessage
? `discord:${author.id}`
: `group:${message.channelId}`,

View File

@@ -1221,6 +1221,7 @@ export async function monitorWebProvider(
const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
ctx: {
Body: combinedBody,
RawBody: msg.body,
From: msg.from,
To: msg.to,
SessionKey: route.sessionKey,