From 7d89fa2591067ec1cb912a12b1f918a326e270f1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 13 Dec 2025 16:32:42 +0000 Subject: [PATCH] feat(gateway): add sessions list/patch RPC --- .../ClawdisProtocol/GatewayModels.swift | 46 ++++ dist/protocol.schema.json | 55 ++++ src/gateway/protocol/index.ts | 14 + src/gateway/protocol/schema.ts | 23 ++ src/gateway/server.test.ts | 96 +++++++ src/gateway/server.ts | 255 +++++++++++++++++- 6 files changed, 488 insertions(+), 1 deletion(-) diff --git a/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift b/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift index a066e032c..9d66643c5 100644 --- a/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift +++ b/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift @@ -478,6 +478,52 @@ public struct NodePairVerifyParams: Codable { } } +public struct SessionsListParams: Codable { + public let limit: Int? + public let activeminutes: Int? + public let includeglobal: Bool? + public let includeunknown: Bool? + + public init( + limit: Int?, + activeminutes: Int?, + includeglobal: Bool?, + includeunknown: Bool? + ) { + self.limit = limit + self.activeminutes = activeminutes + self.includeglobal = includeglobal + self.includeunknown = includeunknown + } + private enum CodingKeys: String, CodingKey { + case limit + case activeminutes = "activeMinutes" + case includeglobal = "includeGlobal" + case includeunknown = "includeUnknown" + } +} + +public struct SessionsPatchParams: Codable { + public let key: String + public let thinkinglevel: AnyCodable? + public let verboselevel: AnyCodable? + + public init( + key: String, + thinkinglevel: AnyCodable?, + verboselevel: AnyCodable? + ) { + self.key = key + self.thinkinglevel = thinkinglevel + self.verboselevel = verboselevel + } + private enum CodingKeys: String, CodingKey { + case key + case thinkinglevel = "thinkingLevel" + case verboselevel = "verboseLevel" + } +} + public struct CronJob: Codable { public let id: String public let name: String? diff --git a/dist/protocol.schema.json b/dist/protocol.schema.json index 614012275..9a2eb0361 100644 --- a/dist/protocol.schema.json +++ b/dist/protocol.schema.json @@ -906,6 +906,61 @@ "token" ] }, + "SessionsListParams": { + "additionalProperties": false, + "type": "object", + "properties": { + "limit": { + "minimum": 1, + "type": "integer" + }, + "activeMinutes": { + "minimum": 1, + "type": "integer" + }, + "includeGlobal": { + "type": "boolean" + }, + "includeUnknown": { + "type": "boolean" + } + } + }, + "SessionsPatchParams": { + "additionalProperties": false, + "type": "object", + "properties": { + "key": { + "minLength": 1, + "type": "string" + }, + "thinkingLevel": { + "anyOf": [ + { + "minLength": 1, + "type": "string" + }, + { + "type": "null" + } + ] + }, + "verboseLevel": { + "anyOf": [ + { + "minLength": 1, + "type": "string" + }, + { + "type": "null" + } + ] + } + }, + "required": [ + "key" + ] + }, "CronJob": { "additionalProperties": false, "type": "object", diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index edb541278..45aa74439 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -55,6 +55,10 @@ import { type ResponseFrame, ResponseFrameSchema, SendParamsSchema, + type SessionsListParams, + SessionsListParamsSchema, + type SessionsPatchParams, + SessionsPatchParamsSchema, type ShutdownEvent, ShutdownEventSchema, type Snapshot, @@ -99,6 +103,12 @@ export const validateNodePairRejectParams = ajv.compile( export const validateNodePairVerifyParams = ajv.compile( NodePairVerifyParamsSchema, ); +export const validateSessionsListParams = ajv.compile( + SessionsListParamsSchema, +); +export const validateSessionsPatchParams = ajv.compile( + SessionsPatchParamsSchema, +); export const validateCronListParams = ajv.compile(CronListParamsSchema); export const validateCronStatusParams = ajv.compile( @@ -148,6 +158,8 @@ export { NodePairApproveParamsSchema, NodePairRejectParamsSchema, NodePairVerifyParamsSchema, + SessionsListParamsSchema, + SessionsPatchParamsSchema, CronJobSchema, CronListParamsSchema, CronStatusParamsSchema, @@ -187,6 +199,8 @@ export type { NodePairApproveParams, NodePairRejectParams, NodePairVerifyParams, + SessionsListParams, + SessionsPatchParams, CronJob, CronListParams, CronStatusParams, diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index 0de15e99f..4590223cb 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -242,6 +242,25 @@ export const NodePairVerifyParamsSchema = Type.Object( { additionalProperties: false }, ); +export const SessionsListParamsSchema = Type.Object( + { + limit: Type.Optional(Type.Integer({ minimum: 1 })), + activeMinutes: Type.Optional(Type.Integer({ minimum: 1 })), + includeGlobal: Type.Optional(Type.Boolean()), + includeUnknown: Type.Optional(Type.Boolean()), + }, + { additionalProperties: false }, +); + +export const SessionsPatchParamsSchema = Type.Object( + { + key: NonEmptyString, + thinkingLevel: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), + verboseLevel: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), + }, + { additionalProperties: false }, +); + export const CronScheduleSchema = Type.Union([ Type.Object( { @@ -477,6 +496,8 @@ export const ProtocolSchemas: Record = { NodePairApproveParams: NodePairApproveParamsSchema, NodePairRejectParams: NodePairRejectParamsSchema, NodePairVerifyParams: NodePairVerifyParamsSchema, + SessionsListParams: SessionsListParamsSchema, + SessionsPatchParams: SessionsPatchParamsSchema, CronJob: CronJobSchema, CronListParams: CronListParamsSchema, CronStatusParams: CronStatusParamsSchema, @@ -512,6 +533,8 @@ export type NodePairListParams = Static; export type NodePairApproveParams = Static; export type NodePairRejectParams = Static; export type NodePairVerifyParams = Static; +export type SessionsListParams = Static; +export type SessionsPatchParams = Static; export type CronJob = Static; export type CronListParams = Static; export type CronStatusParams = Static; diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 4e4ca8161..181e6e7b2 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -202,6 +202,22 @@ async function connectOk( return res.payload as { type: "hello-ok" }; } +async function rpcReq( + ws: WebSocket, + method: string, + params?: unknown, +) { + const id = randomUUID(); + ws.send(JSON.stringify({ type: "req", id, method, params })); + return await onceMessage<{ + type: "res"; + id: string; + ok: boolean; + payload?: T; + error?: { message?: string }; + }>(ws, (o) => o.type === "res" && o.id === id); +} + describe("gateway server", () => { test("supports gateway-owned node pairing methods and events", async () => { const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-")); @@ -1654,6 +1670,86 @@ describe("gateway server", () => { await server.close(); }); + test("lists and patches session store via sessions.* RPC", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-sessions-")); + const storePath = path.join(dir, "sessions.json"); + const now = Date.now(); + testSessionStorePath = storePath; + + await fs.writeFile( + storePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + updatedAt: now - 60_000, + inputTokens: 10, + outputTokens: 20, + thinkingLevel: "low", + verboseLevel: "on", + }, + "group:dev": { + sessionId: "sess-group", + updatedAt: now - 120_000, + totalTokens: 50, + }, + global: { + sessionId: "sess-global", + updatedAt: now - 10_000, + }, + }, + null, + 2, + ), + "utf-8", + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const list1 = await rpcReq<{ + path: string; + sessions: Array<{ + key: string; + totalTokens?: number; + thinkingLevel?: string; + verboseLevel?: string; + }>; + }>(ws, "sessions.list", { includeGlobal: false, includeUnknown: false }); + + expect(list1.ok).toBe(true); + expect(list1.payload?.path).toBe(storePath); + expect(list1.payload?.sessions.some((s) => s.key === "global")).toBe(false); + const main = list1.payload?.sessions.find((s) => s.key === "main"); + expect(main?.totalTokens).toBe(30); + expect(main?.thinkingLevel).toBe("low"); + expect(main?.verboseLevel).toBe("on"); + + const patched = await rpcReq<{ ok: true; key: string }>( + ws, + "sessions.patch", + { key: "main", thinkingLevel: "medium", verboseLevel: null }, + ); + expect(patched.ok).toBe(true); + expect(patched.payload?.ok).toBe(true); + expect(patched.payload?.key).toBe("main"); + + const list2 = await rpcReq<{ + sessions: Array<{ + key: string; + thinkingLevel?: string; + verboseLevel?: string; + }>; + }>(ws, "sessions.list", {}); + expect(list2.ok).toBe(true); + const main2 = list2.payload?.sessions.find((s) => s.key === "main"); + expect(main2?.thinkingLevel).toBe("medium"); + expect(main2?.verboseLevel).toBeUndefined(); + + ws.close(); + await server.close(); + }); + test("refuses to start when port already bound", async () => { const { server: blocker, port } = await occupyPort(); await expect(startGatewayServer(port)).rejects.toBeInstanceOf( diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 12cb8fbf7..d1d8cca9d 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -8,6 +8,12 @@ import os from "node:os"; import path from "node:path"; import chalk from "chalk"; import { type WebSocket, WebSocketServer } from "ws"; +import { lookupContextTokens } from "../agents/context.js"; +import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL } from "../agents/defaults.js"; +import { + normalizeThinkLevel, + normalizeVerboseLevel, +} from "../auto-reply/thinking.js"; import { startBrowserControlServerFromConfig, stopBrowserControlServer, @@ -16,7 +22,7 @@ import { createDefaultDeps } from "../cli/deps.js"; import { agentCommand } from "../commands/agent.js"; import { getHealthSnapshot, type HealthSummary } from "../commands/health.js"; import { getStatusSummary } from "../commands/status.js"; -import { loadConfig } from "../config/config.js"; +import { type ClawdisConfig, loadConfig } from "../config/config.js"; import { loadSessionStore, resolveStorePath, @@ -79,6 +85,8 @@ import { formatValidationErrors, PROTOCOL_VERSION, type RequestFrame, + type SessionsListParams, + type SessionsPatchParams, type Snapshot, validateAgentParams, validateChatHistoryParams, @@ -98,6 +106,8 @@ import { validateNodePairVerifyParams, validateRequestFrame, validateSendParams, + validateSessionsListParams, + validateSessionsPatchParams, validateWakeParams, } from "./protocol/index.js"; @@ -108,9 +118,48 @@ type Client = { presenceKey?: string; }; +type GatewaySessionsDefaults = { + model: string | null; + contextTokens: number | null; +}; + +type GatewaySessionRow = { + key: string; + kind: "direct" | "group" | "global" | "unknown"; + updatedAt: number | null; + sessionId?: string; + systemSent?: boolean; + abortedLastRun?: boolean; + thinkingLevel?: string; + verboseLevel?: string; + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + model?: string; + contextTokens?: number; + syncing?: boolean | string; +}; + +type SessionsListResult = { + ts: number; + path: string; + count: number; + defaults: GatewaySessionsDefaults; + sessions: GatewaySessionRow[]; +}; + +type SessionsPatchResult = { + ok: true; + path: string; + key: string; + entry: SessionEntry; +}; + const METHODS = [ "health", "status", + "sessions.list", + "sessions.patch", "last-heartbeat", "set-heartbeats", "wake", @@ -279,6 +328,88 @@ function loadSessionEntry(sessionKey: string) { return { cfg, storePath, store, entry }; } +function classifySessionKey(key: string): GatewaySessionRow["kind"] { + if (key === "global") return "global"; + if (key.startsWith("group:")) return "group"; + if (key === "unknown") return "unknown"; + return "direct"; +} + +function getSessionDefaults(cfg: ClawdisConfig): GatewaySessionsDefaults { + const model = cfg.inbound?.reply?.agent?.model ?? DEFAULT_MODEL; + const contextTokens = + cfg.inbound?.reply?.agent?.contextTokens ?? + lookupContextTokens(model) ?? + DEFAULT_CONTEXT_TOKENS; + return { model: model ?? null, contextTokens: contextTokens ?? null }; +} + +function listSessionsFromStore(params: { + cfg: ClawdisConfig; + storePath: string; + store: Record; + opts: SessionsListParams; +}): SessionsListResult { + const { cfg, storePath, store, opts } = params; + const now = Date.now(); + + const includeGlobal = opts.includeGlobal === true; + const includeUnknown = opts.includeUnknown === true; + const activeMinutes = + typeof opts.activeMinutes === "number" && + Number.isFinite(opts.activeMinutes) + ? Math.max(1, Math.floor(opts.activeMinutes)) + : undefined; + + let sessions = Object.entries(store) + .filter(([key]) => { + if (!includeGlobal && key === "global") return false; + if (!includeUnknown && key === "unknown") return false; + return true; + }) + .map(([key, entry]) => { + const updatedAt = entry?.updatedAt ?? null; + const input = entry?.inputTokens ?? 0; + const output = entry?.outputTokens ?? 0; + const total = entry?.totalTokens ?? input + output; + return { + key, + kind: classifySessionKey(key), + updatedAt, + sessionId: entry?.sessionId, + systemSent: entry?.systemSent, + abortedLastRun: entry?.abortedLastRun, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + inputTokens: entry?.inputTokens, + outputTokens: entry?.outputTokens, + totalTokens: total, + model: entry?.model, + contextTokens: entry?.contextTokens, + syncing: entry?.syncing, + } satisfies GatewaySessionRow; + }) + .sort((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0)); + + if (activeMinutes !== undefined) { + const cutoff = now - activeMinutes * 60_000; + sessions = sessions.filter((s) => (s.updatedAt ?? 0) >= cutoff); + } + + if (typeof opts.limit === "number" && Number.isFinite(opts.limit)) { + const limit = Math.max(1, Math.floor(opts.limit)); + sessions = sessions.slice(0, limit); + } + + return { + ts: now, + path: storePath, + count: sessions.length, + defaults: getSessionDefaults(cfg), + sessions, + }; +} + function logWs( direction: "in" | "out", kind: string, @@ -1508,6 +1639,128 @@ export async function startGatewayServer( respond(true, status, undefined); break; } + case "sessions.list": { + const params = (req.params ?? {}) as Record; + if (!validateSessionsListParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`, + ), + ); + break; + } + const p = params as SessionsListParams; + const cfg = loadConfig(); + const storePath = resolveStorePath( + cfg.inbound?.reply?.session?.store, + ); + const store = loadSessionStore(storePath); + const result = listSessionsFromStore({ + cfg, + storePath, + store, + opts: p, + }); + respond(true, result, undefined); + break; + } + case "sessions.patch": { + const params = (req.params ?? {}) as Record; + if (!validateSessionsPatchParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`, + ), + ); + break; + } + const p = params as SessionsPatchParams; + const key = String(p.key ?? "").trim(); + if (!key) { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, "key required"), + ); + break; + } + + const cfg = loadConfig(); + const storePath = resolveStorePath( + cfg.inbound?.reply?.session?.store, + ); + const store = loadSessionStore(storePath); + const now = Date.now(); + + const existing = store[key]; + const next: SessionEntry = existing + ? { + ...existing, + updatedAt: Math.max(existing.updatedAt ?? 0, now), + } + : { sessionId: randomUUID(), updatedAt: now }; + + if ("thinkingLevel" in p) { + const raw = p.thinkingLevel; + if (raw === null) { + delete next.thinkingLevel; + } else if (raw !== undefined) { + const normalized = normalizeThinkLevel(String(raw)); + if (!normalized) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + "invalid thinkingLevel (use off|minimal|low|medium|high)", + ), + ); + break; + } + if (normalized === "off") delete next.thinkingLevel; + else next.thinkingLevel = normalized; + } + } + + if ("verboseLevel" in p) { + const raw = p.verboseLevel; + if (raw === null) { + delete next.verboseLevel; + } else if (raw !== undefined) { + const normalized = normalizeVerboseLevel(String(raw)); + if (!normalized) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + 'invalid verboseLevel (use "on"|"off")', + ), + ); + break; + } + if (normalized === "off") delete next.verboseLevel; + else next.verboseLevel = normalized; + } + } + + store[key] = next; + await saveSessionStore(storePath, store); + const result: SessionsPatchResult = { + ok: true, + path: storePath, + key, + entry: next, + }; + respond(true, result, undefined); + break; + } case "last-heartbeat": { respond(true, getLastHeartbeatEvent(), undefined); break;