fix: lock main session deletion
This commit is contained in:
@@ -98,6 +98,11 @@ type EmbeddedPiQueueHandle = {
|
||||
const log = createSubsystemLogger("agent/embedded");
|
||||
|
||||
const ACTIVE_EMBEDDED_RUNS = new Map<string, EmbeddedPiQueueHandle>();
|
||||
type EmbeddedRunWaiter = {
|
||||
resolve: (ended: boolean) => void;
|
||||
timer: NodeJS.Timeout;
|
||||
};
|
||||
const EMBEDDED_RUN_WAITERS = new Map<string, Set<EmbeddedRunWaiter>>();
|
||||
|
||||
const OAUTH_FILENAME = "oauth.json";
|
||||
const DEFAULT_OAUTH_DIR = path.join(CONFIG_DIR, "credentials");
|
||||
@@ -247,6 +252,43 @@ export function isEmbeddedPiRunStreaming(sessionId: string): boolean {
|
||||
return handle.isStreaming();
|
||||
}
|
||||
|
||||
export function waitForEmbeddedPiRunEnd(
|
||||
sessionId: string,
|
||||
timeoutMs = 15_000,
|
||||
): Promise<boolean> {
|
||||
if (!sessionId || !ACTIVE_EMBEDDED_RUNS.has(sessionId))
|
||||
return Promise.resolve(true);
|
||||
return new Promise((resolve) => {
|
||||
const waiters = EMBEDDED_RUN_WAITERS.get(sessionId) ?? new Set();
|
||||
const waiter: EmbeddedRunWaiter = {
|
||||
resolve,
|
||||
timer: setTimeout(() => {
|
||||
waiters.delete(waiter);
|
||||
if (waiters.size === 0) EMBEDDED_RUN_WAITERS.delete(sessionId);
|
||||
resolve(false);
|
||||
}, Math.max(100, timeoutMs)),
|
||||
};
|
||||
waiters.add(waiter);
|
||||
EMBEDDED_RUN_WAITERS.set(sessionId, waiters);
|
||||
if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
|
||||
waiters.delete(waiter);
|
||||
if (waiters.size === 0) EMBEDDED_RUN_WAITERS.delete(sessionId);
|
||||
clearTimeout(waiter.timer);
|
||||
resolve(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function notifyEmbeddedRunEnded(sessionId: string) {
|
||||
const waiters = EMBEDDED_RUN_WAITERS.get(sessionId);
|
||||
if (!waiters || waiters.size === 0) return;
|
||||
EMBEDDED_RUN_WAITERS.delete(sessionId);
|
||||
for (const waiter of waiters) {
|
||||
clearTimeout(waiter.timer);
|
||||
waiter.resolve(true);
|
||||
}
|
||||
}
|
||||
|
||||
export function resolveEmbeddedSessionLane(key: string) {
|
||||
return resolveSessionLane(key);
|
||||
}
|
||||
@@ -602,6 +644,7 @@ export async function runEmbeddedPiAgent(params: {
|
||||
unsubscribe();
|
||||
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
|
||||
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
|
||||
notifyEmbeddedRunEnded(params.sessionId);
|
||||
}
|
||||
session.dispose();
|
||||
params.abortSignal?.removeEventListener?.("abort", onAbort);
|
||||
|
||||
@@ -8,6 +8,7 @@ export {
|
||||
isEmbeddedPiRunActive,
|
||||
isEmbeddedPiRunStreaming,
|
||||
queueEmbeddedPiMessage,
|
||||
waitForEmbeddedPiRunEnd,
|
||||
resolveEmbeddedSessionLane,
|
||||
runEmbeddedPiAgent,
|
||||
} from "./pi-embedded-runner.js";
|
||||
|
||||
@@ -14,7 +14,7 @@ export function registerTuiCli(program: Command) {
|
||||
.option("--password <password>", "Gateway password (if required)")
|
||||
.option(
|
||||
"--session <key>",
|
||||
"Session key (default: session.mainKey from config)",
|
||||
'Session key (default: "main", or "global" when scope is global)',
|
||||
)
|
||||
.option("--deliver", "Deliver assistant replies", false)
|
||||
.option("--thinking <level>", "Thinking level override")
|
||||
|
||||
@@ -15,6 +15,7 @@ import { parseDurationMs } from "../cli/parse-duration.js";
|
||||
* - Config is managed externally (read-only from Nix perspective)
|
||||
*/
|
||||
export const isNixMode = process.env.CLAWDIS_NIX_MODE === "1";
|
||||
let warnedMainKeyOverride = false;
|
||||
|
||||
export type ReplyMode = "text" | "command";
|
||||
export type SessionScope = "per-sender" | "global";
|
||||
@@ -1771,6 +1772,22 @@ function applyIdentityDefaults(cfg: ClawdisConfig): ClawdisConfig {
|
||||
return mutated ? next : cfg;
|
||||
}
|
||||
|
||||
function applySessionDefaults(cfg: ClawdisConfig): ClawdisConfig {
|
||||
const session = cfg.session;
|
||||
if (!session || session.mainKey === undefined) return cfg;
|
||||
|
||||
const trimmed = session.mainKey.trim();
|
||||
const next: ClawdisConfig = { ...cfg, session: { ...session } };
|
||||
next.session.mainKey = "main";
|
||||
|
||||
if (trimmed && trimmed !== "main" && !warnedMainKeyOverride) {
|
||||
warnedMainKeyOverride = true;
|
||||
console.warn('session.mainKey is ignored; main session is always "main".');
|
||||
}
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
export function loadConfig(): ClawdisConfig {
|
||||
// Read config file (JSON5) if present.
|
||||
const configPath = CONFIG_PATH_CLAWDIS;
|
||||
@@ -1787,7 +1804,9 @@ export function loadConfig(): ClawdisConfig {
|
||||
}
|
||||
return {};
|
||||
}
|
||||
return applyIdentityDefaults(validated.data as ClawdisConfig);
|
||||
return applySessionDefaults(
|
||||
applyIdentityDefaults(validated.data as ClawdisConfig),
|
||||
);
|
||||
} catch (err) {
|
||||
console.error(`Failed to read config at ${configPath}`, err);
|
||||
return {};
|
||||
@@ -1821,7 +1840,9 @@ export function validateConfigObject(
|
||||
}
|
||||
return {
|
||||
ok: true,
|
||||
config: applyIdentityDefaults(validated.data as ClawdisConfig),
|
||||
config: applySessionDefaults(
|
||||
applyIdentityDefaults(validated.data as ClawdisConfig),
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1880,7 +1901,7 @@ export async function readConfigFileSnapshot(): Promise<ConfigFileSnapshot> {
|
||||
const configPath = CONFIG_PATH_CLAWDIS;
|
||||
const exists = fs.existsSync(configPath);
|
||||
if (!exists) {
|
||||
const config = applyTalkApiKey({});
|
||||
const config = applyTalkApiKey(applySessionDefaults({}));
|
||||
const legacyIssues: LegacyConfigIssue[] = [];
|
||||
return {
|
||||
path: configPath,
|
||||
@@ -1934,7 +1955,7 @@ export async function readConfigFileSnapshot(): Promise<ConfigFileSnapshot> {
|
||||
raw,
|
||||
parsed: parsedRes.parsed,
|
||||
valid: true,
|
||||
config: applyTalkApiKey(validated.config),
|
||||
config: applyTalkApiKey(applySessionDefaults(validated.config)),
|
||||
issues: [],
|
||||
legacyIssues,
|
||||
};
|
||||
|
||||
@@ -103,6 +103,13 @@ export function resolveStorePath(store?: string) {
|
||||
return path.resolve(store);
|
||||
}
|
||||
|
||||
export function resolveMainSessionKey(
|
||||
cfg?: { session?: { scope?: SessionScope; mainKey?: string } },
|
||||
): string {
|
||||
if (cfg?.session?.scope === "global") return "global";
|
||||
return "main";
|
||||
}
|
||||
|
||||
function normalizeGroupLabel(raw?: string) {
|
||||
const trimmed = raw?.trim().toLowerCase() ?? "";
|
||||
if (!trimmed) return "";
|
||||
|
||||
@@ -2,6 +2,12 @@ import { randomUUID } from "node:crypto";
|
||||
import fs from "node:fs";
|
||||
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js";
|
||||
import type { ModelCatalogEntry } from "../agents/model-catalog.js";
|
||||
import {
|
||||
abortEmbeddedPiRun,
|
||||
isEmbeddedPiRunActive,
|
||||
resolveEmbeddedSessionLane,
|
||||
waitForEmbeddedPiRunEnd,
|
||||
} from "../agents/pi-embedded.js";
|
||||
import {
|
||||
buildAllowedModelSet,
|
||||
buildModelAliasIndex,
|
||||
@@ -29,6 +35,7 @@ import {
|
||||
import { buildConfigSchema } from "../config/schema.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveMainSessionKey,
|
||||
resolveStorePath,
|
||||
type SessionEntry,
|
||||
saveSessionStore,
|
||||
@@ -38,6 +45,7 @@ import {
|
||||
setVoiceWakeTriggers,
|
||||
} from "../infra/voicewake.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { clearCommandLane } from "../process/command-queue.js";
|
||||
import { normalizeSendPolicy } from "../sessions/send-policy.js";
|
||||
import { buildMessageWithAttachments } from "./chat-attachments.js";
|
||||
import {
|
||||
@@ -569,12 +577,37 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
|
||||
};
|
||||
}
|
||||
|
||||
const mainKey = resolveMainSessionKey(loadConfig());
|
||||
if (key === mainKey) {
|
||||
return {
|
||||
ok: false,
|
||||
error: {
|
||||
code: ErrorCodes.INVALID_REQUEST,
|
||||
message: `Cannot delete the main session (${mainKey}).`,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const deleteTranscript =
|
||||
typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true;
|
||||
|
||||
const { storePath, store, entry } = loadSessionEntry(key);
|
||||
const sessionId = entry?.sessionId;
|
||||
const existed = Boolean(store[key]);
|
||||
clearCommandLane(resolveEmbeddedSessionLane(key));
|
||||
if (sessionId && isEmbeddedPiRunActive(sessionId)) {
|
||||
abortEmbeddedPiRun(sessionId);
|
||||
const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000);
|
||||
if (!ended) {
|
||||
return {
|
||||
ok: false,
|
||||
error: {
|
||||
code: ErrorCodes.UNAVAILABLE,
|
||||
message: `Session ${key} is still active; try again in a moment.`,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
if (existed) delete store[key];
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import { describe, expect, test } from "vitest";
|
||||
import {
|
||||
connectOk,
|
||||
installGatewayTestHooks,
|
||||
embeddedRunMock,
|
||||
piSdkMock,
|
||||
rpcReq,
|
||||
startServerWithClient,
|
||||
@@ -217,4 +218,59 @@ describe("gateway server sessions", () => {
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("sessions.delete rejects main and aborts active runs", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-sessions-"));
|
||||
const storePath = path.join(dir, "sessions.json");
|
||||
testState.sessionStorePath = storePath;
|
||||
|
||||
await fs.writeFile(
|
||||
path.join(dir, "sess-main.jsonl"),
|
||||
`${JSON.stringify({ role: "user", content: "hello" })}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
await fs.writeFile(
|
||||
path.join(dir, "sess-active.jsonl"),
|
||||
`${JSON.stringify({ role: "user", content: "active" })}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
main: { sessionId: "sess-main", updatedAt: Date.now() },
|
||||
"discord:group:dev": {
|
||||
sessionId: "sess-active",
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
embeddedRunMock.activeIds.add("sess-active");
|
||||
embeddedRunMock.waitResults.set("sess-active", true);
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const mainDelete = await rpcReq(ws, "sessions.delete", { key: "main" });
|
||||
expect(mainDelete.ok).toBe(false);
|
||||
|
||||
const deleted = await rpcReq<{ ok: true; deleted: boolean }>(
|
||||
ws,
|
||||
"sessions.delete",
|
||||
{ key: "discord:group:dev" },
|
||||
);
|
||||
expect(deleted.ok).toBe(true);
|
||||
expect(deleted.payload?.deleted).toBe(true);
|
||||
expect(embeddedRunMock.abortCalls).toEqual(["sess-active"]);
|
||||
expect(embeddedRunMock.waitCalls).toEqual(["sess-active"]);
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -39,7 +39,7 @@ export type BridgeStartOpts = {
|
||||
>;
|
||||
};
|
||||
|
||||
const hoisted = vi.hoisted(() => ({
|
||||
const hoisted = vi.hoisted(() => ({
|
||||
bridgeStartCalls: [] as BridgeStartOpts[],
|
||||
bridgeInvoke: vi.fn(async () => ({
|
||||
type: "invoke-res",
|
||||
@@ -66,6 +66,12 @@ const hoisted = vi.hoisted(() => ({
|
||||
agentCommand: vi.fn().mockResolvedValue(undefined),
|
||||
testIsNixMode: { value: false },
|
||||
sessionStoreSaveDelayMs: { value: 0 },
|
||||
embeddedRunMock: {
|
||||
activeIds: new Set<string>(),
|
||||
abortCalls: [] as string[],
|
||||
waitCalls: [] as string[],
|
||||
waitResults: new Map<string, boolean>(),
|
||||
},
|
||||
}));
|
||||
|
||||
export const bridgeStartCalls = hoisted.bridgeStartCalls;
|
||||
@@ -95,6 +101,7 @@ export const testState = {
|
||||
|
||||
export const testIsNixMode = hoisted.testIsNixMode;
|
||||
export const sessionStoreSaveDelayMs = hoisted.sessionStoreSaveDelayMs;
|
||||
export const embeddedRunMock = hoisted.embeddedRunMock;
|
||||
|
||||
vi.mock("@mariozechner/pi-coding-agent", async () => {
|
||||
const actual = await vi.importActual<
|
||||
@@ -284,6 +291,25 @@ vi.mock("../config/config.js", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../agents/pi-embedded.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../agents/pi-embedded.js")>(
|
||||
"../agents/pi-embedded.js",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
isEmbeddedPiRunActive: (sessionId: string) =>
|
||||
embeddedRunMock.activeIds.has(sessionId),
|
||||
abortEmbeddedPiRun: (sessionId: string) => {
|
||||
embeddedRunMock.abortCalls.push(sessionId);
|
||||
return embeddedRunMock.activeIds.has(sessionId);
|
||||
},
|
||||
waitForEmbeddedPiRunEnd: async (sessionId: string) => {
|
||||
embeddedRunMock.waitCalls.push(sessionId);
|
||||
return embeddedRunMock.waitResults.get(sessionId) ?? true;
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../commands/health.js", () => ({
|
||||
getHealthSnapshot: vi.fn().mockResolvedValue({ ok: true, stub: true }),
|
||||
}));
|
||||
@@ -329,6 +355,10 @@ export function installGatewayTestHooks() {
|
||||
testIsNixMode.value = false;
|
||||
cronIsolatedRun.mockClear();
|
||||
agentCommand.mockClear();
|
||||
embeddedRunMock.activeIds.clear();
|
||||
embeddedRunMock.abortCalls = [];
|
||||
embeddedRunMock.waitCalls = [];
|
||||
embeddedRunMock.waitResults.clear();
|
||||
drainSystemEvents();
|
||||
resetAgentRunContextForTest();
|
||||
const mod = await import("./server.js");
|
||||
|
||||
Reference in New Issue
Block a user