feat(gateway): add sessions list/patch RPC

This commit is contained in:
Peter Steinberger
2025-12-13 16:32:42 +00:00
parent 5f67c023a2
commit 7d89fa2591
6 changed files with 488 additions and 1 deletions

View File

@@ -478,6 +478,52 @@ public struct NodePairVerifyParams: Codable {
}
}
public struct SessionsListParams: Codable {
public let limit: Int?
public let activeminutes: Int?
public let includeglobal: Bool?
public let includeunknown: Bool?
public init(
limit: Int?,
activeminutes: Int?,
includeglobal: Bool?,
includeunknown: Bool?
) {
self.limit = limit
self.activeminutes = activeminutes
self.includeglobal = includeglobal
self.includeunknown = includeunknown
}
private enum CodingKeys: String, CodingKey {
case limit
case activeminutes = "activeMinutes"
case includeglobal = "includeGlobal"
case includeunknown = "includeUnknown"
}
}
public struct SessionsPatchParams: Codable {
public let key: String
public let thinkinglevel: AnyCodable?
public let verboselevel: AnyCodable?
public init(
key: String,
thinkinglevel: AnyCodable?,
verboselevel: AnyCodable?
) {
self.key = key
self.thinkinglevel = thinkinglevel
self.verboselevel = verboselevel
}
private enum CodingKeys: String, CodingKey {
case key
case thinkinglevel = "thinkingLevel"
case verboselevel = "verboseLevel"
}
}
public struct CronJob: Codable {
public let id: String
public let name: String?

View File

@@ -906,6 +906,61 @@
"token"
]
},
"SessionsListParams": {
"additionalProperties": false,
"type": "object",
"properties": {
"limit": {
"minimum": 1,
"type": "integer"
},
"activeMinutes": {
"minimum": 1,
"type": "integer"
},
"includeGlobal": {
"type": "boolean"
},
"includeUnknown": {
"type": "boolean"
}
}
},
"SessionsPatchParams": {
"additionalProperties": false,
"type": "object",
"properties": {
"key": {
"minLength": 1,
"type": "string"
},
"thinkingLevel": {
"anyOf": [
{
"minLength": 1,
"type": "string"
},
{
"type": "null"
}
]
},
"verboseLevel": {
"anyOf": [
{
"minLength": 1,
"type": "string"
},
{
"type": "null"
}
]
}
},
"required": [
"key"
]
},
"CronJob": {
"additionalProperties": false,
"type": "object",

View File

@@ -55,6 +55,10 @@ import {
type ResponseFrame,
ResponseFrameSchema,
SendParamsSchema,
type SessionsListParams,
SessionsListParamsSchema,
type SessionsPatchParams,
SessionsPatchParamsSchema,
type ShutdownEvent,
ShutdownEventSchema,
type Snapshot,
@@ -99,6 +103,12 @@ export const validateNodePairRejectParams = ajv.compile<NodePairRejectParams>(
export const validateNodePairVerifyParams = ajv.compile<NodePairVerifyParams>(
NodePairVerifyParamsSchema,
);
export const validateSessionsListParams = ajv.compile<SessionsListParams>(
SessionsListParamsSchema,
);
export const validateSessionsPatchParams = ajv.compile<SessionsPatchParams>(
SessionsPatchParamsSchema,
);
export const validateCronListParams =
ajv.compile<CronListParams>(CronListParamsSchema);
export const validateCronStatusParams = ajv.compile<CronStatusParams>(
@@ -148,6 +158,8 @@ export {
NodePairApproveParamsSchema,
NodePairRejectParamsSchema,
NodePairVerifyParamsSchema,
SessionsListParamsSchema,
SessionsPatchParamsSchema,
CronJobSchema,
CronListParamsSchema,
CronStatusParamsSchema,
@@ -187,6 +199,8 @@ export type {
NodePairApproveParams,
NodePairRejectParams,
NodePairVerifyParams,
SessionsListParams,
SessionsPatchParams,
CronJob,
CronListParams,
CronStatusParams,

View File

@@ -242,6 +242,25 @@ export const NodePairVerifyParamsSchema = Type.Object(
{ additionalProperties: false },
);
export const SessionsListParamsSchema = Type.Object(
{
limit: Type.Optional(Type.Integer({ minimum: 1 })),
activeMinutes: Type.Optional(Type.Integer({ minimum: 1 })),
includeGlobal: Type.Optional(Type.Boolean()),
includeUnknown: Type.Optional(Type.Boolean()),
},
{ additionalProperties: false },
);
export const SessionsPatchParamsSchema = Type.Object(
{
key: NonEmptyString,
thinkingLevel: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
verboseLevel: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
},
{ additionalProperties: false },
);
export const CronScheduleSchema = Type.Union([
Type.Object(
{
@@ -477,6 +496,8 @@ export const ProtocolSchemas: Record<string, TSchema> = {
NodePairApproveParams: NodePairApproveParamsSchema,
NodePairRejectParams: NodePairRejectParamsSchema,
NodePairVerifyParams: NodePairVerifyParamsSchema,
SessionsListParams: SessionsListParamsSchema,
SessionsPatchParams: SessionsPatchParamsSchema,
CronJob: CronJobSchema,
CronListParams: CronListParamsSchema,
CronStatusParams: CronStatusParamsSchema,
@@ -512,6 +533,8 @@ export type NodePairListParams = Static<typeof NodePairListParamsSchema>;
export type NodePairApproveParams = Static<typeof NodePairApproveParamsSchema>;
export type NodePairRejectParams = Static<typeof NodePairRejectParamsSchema>;
export type NodePairVerifyParams = Static<typeof NodePairVerifyParamsSchema>;
export type SessionsListParams = Static<typeof SessionsListParamsSchema>;
export type SessionsPatchParams = Static<typeof SessionsPatchParamsSchema>;
export type CronJob = Static<typeof CronJobSchema>;
export type CronListParams = Static<typeof CronListParamsSchema>;
export type CronStatusParams = Static<typeof CronStatusParamsSchema>;

View File

@@ -202,6 +202,22 @@ async function connectOk(
return res.payload as { type: "hello-ok" };
}
async function rpcReq<T = unknown>(
ws: WebSocket,
method: string,
params?: unknown,
) {
const id = randomUUID();
ws.send(JSON.stringify({ type: "req", id, method, params }));
return await onceMessage<{
type: "res";
id: string;
ok: boolean;
payload?: T;
error?: { message?: string };
}>(ws, (o) => o.type === "res" && o.id === id);
}
describe("gateway server", () => {
test("supports gateway-owned node pairing methods and events", async () => {
const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-"));
@@ -1654,6 +1670,86 @@ describe("gateway server", () => {
await server.close();
});
test("lists and patches session store via sessions.* RPC", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-sessions-"));
const storePath = path.join(dir, "sessions.json");
const now = Date.now();
testSessionStorePath = storePath;
await fs.writeFile(
storePath,
JSON.stringify(
{
main: {
sessionId: "sess-main",
updatedAt: now - 60_000,
inputTokens: 10,
outputTokens: 20,
thinkingLevel: "low",
verboseLevel: "on",
},
"group:dev": {
sessionId: "sess-group",
updatedAt: now - 120_000,
totalTokens: 50,
},
global: {
sessionId: "sess-global",
updatedAt: now - 10_000,
},
},
null,
2,
),
"utf-8",
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const list1 = await rpcReq<{
path: string;
sessions: Array<{
key: string;
totalTokens?: number;
thinkingLevel?: string;
verboseLevel?: string;
}>;
}>(ws, "sessions.list", { includeGlobal: false, includeUnknown: false });
expect(list1.ok).toBe(true);
expect(list1.payload?.path).toBe(storePath);
expect(list1.payload?.sessions.some((s) => s.key === "global")).toBe(false);
const main = list1.payload?.sessions.find((s) => s.key === "main");
expect(main?.totalTokens).toBe(30);
expect(main?.thinkingLevel).toBe("low");
expect(main?.verboseLevel).toBe("on");
const patched = await rpcReq<{ ok: true; key: string }>(
ws,
"sessions.patch",
{ key: "main", thinkingLevel: "medium", verboseLevel: null },
);
expect(patched.ok).toBe(true);
expect(patched.payload?.ok).toBe(true);
expect(patched.payload?.key).toBe("main");
const list2 = await rpcReq<{
sessions: Array<{
key: string;
thinkingLevel?: string;
verboseLevel?: string;
}>;
}>(ws, "sessions.list", {});
expect(list2.ok).toBe(true);
const main2 = list2.payload?.sessions.find((s) => s.key === "main");
expect(main2?.thinkingLevel).toBe("medium");
expect(main2?.verboseLevel).toBeUndefined();
ws.close();
await server.close();
});
test("refuses to start when port already bound", async () => {
const { server: blocker, port } = await occupyPort();
await expect(startGatewayServer(port)).rejects.toBeInstanceOf(

View File

@@ -8,6 +8,12 @@ import os from "node:os";
import path from "node:path";
import chalk from "chalk";
import { type WebSocket, WebSocketServer } from "ws";
import { lookupContextTokens } from "../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL } from "../agents/defaults.js";
import {
normalizeThinkLevel,
normalizeVerboseLevel,
} from "../auto-reply/thinking.js";
import {
startBrowserControlServerFromConfig,
stopBrowserControlServer,
@@ -16,7 +22,7 @@ import { createDefaultDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import { getHealthSnapshot, type HealthSummary } from "../commands/health.js";
import { getStatusSummary } from "../commands/status.js";
import { loadConfig } from "../config/config.js";
import { type ClawdisConfig, loadConfig } from "../config/config.js";
import {
loadSessionStore,
resolveStorePath,
@@ -79,6 +85,8 @@ import {
formatValidationErrors,
PROTOCOL_VERSION,
type RequestFrame,
type SessionsListParams,
type SessionsPatchParams,
type Snapshot,
validateAgentParams,
validateChatHistoryParams,
@@ -98,6 +106,8 @@ import {
validateNodePairVerifyParams,
validateRequestFrame,
validateSendParams,
validateSessionsListParams,
validateSessionsPatchParams,
validateWakeParams,
} from "./protocol/index.js";
@@ -108,9 +118,48 @@ type Client = {
presenceKey?: string;
};
type GatewaySessionsDefaults = {
model: string | null;
contextTokens: number | null;
};
type GatewaySessionRow = {
key: string;
kind: "direct" | "group" | "global" | "unknown";
updatedAt: number | null;
sessionId?: string;
systemSent?: boolean;
abortedLastRun?: boolean;
thinkingLevel?: string;
verboseLevel?: string;
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
model?: string;
contextTokens?: number;
syncing?: boolean | string;
};
type SessionsListResult = {
ts: number;
path: string;
count: number;
defaults: GatewaySessionsDefaults;
sessions: GatewaySessionRow[];
};
type SessionsPatchResult = {
ok: true;
path: string;
key: string;
entry: SessionEntry;
};
const METHODS = [
"health",
"status",
"sessions.list",
"sessions.patch",
"last-heartbeat",
"set-heartbeats",
"wake",
@@ -279,6 +328,88 @@ function loadSessionEntry(sessionKey: string) {
return { cfg, storePath, store, entry };
}
function classifySessionKey(key: string): GatewaySessionRow["kind"] {
if (key === "global") return "global";
if (key.startsWith("group:")) return "group";
if (key === "unknown") return "unknown";
return "direct";
}
function getSessionDefaults(cfg: ClawdisConfig): GatewaySessionsDefaults {
const model = cfg.inbound?.reply?.agent?.model ?? DEFAULT_MODEL;
const contextTokens =
cfg.inbound?.reply?.agent?.contextTokens ??
lookupContextTokens(model) ??
DEFAULT_CONTEXT_TOKENS;
return { model: model ?? null, contextTokens: contextTokens ?? null };
}
function listSessionsFromStore(params: {
cfg: ClawdisConfig;
storePath: string;
store: Record<string, SessionEntry>;
opts: SessionsListParams;
}): SessionsListResult {
const { cfg, storePath, store, opts } = params;
const now = Date.now();
const includeGlobal = opts.includeGlobal === true;
const includeUnknown = opts.includeUnknown === true;
const activeMinutes =
typeof opts.activeMinutes === "number" &&
Number.isFinite(opts.activeMinutes)
? Math.max(1, Math.floor(opts.activeMinutes))
: undefined;
let sessions = Object.entries(store)
.filter(([key]) => {
if (!includeGlobal && key === "global") return false;
if (!includeUnknown && key === "unknown") return false;
return true;
})
.map(([key, entry]) => {
const updatedAt = entry?.updatedAt ?? null;
const input = entry?.inputTokens ?? 0;
const output = entry?.outputTokens ?? 0;
const total = entry?.totalTokens ?? input + output;
return {
key,
kind: classifySessionKey(key),
updatedAt,
sessionId: entry?.sessionId,
systemSent: entry?.systemSent,
abortedLastRun: entry?.abortedLastRun,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
inputTokens: entry?.inputTokens,
outputTokens: entry?.outputTokens,
totalTokens: total,
model: entry?.model,
contextTokens: entry?.contextTokens,
syncing: entry?.syncing,
} satisfies GatewaySessionRow;
})
.sort((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0));
if (activeMinutes !== undefined) {
const cutoff = now - activeMinutes * 60_000;
sessions = sessions.filter((s) => (s.updatedAt ?? 0) >= cutoff);
}
if (typeof opts.limit === "number" && Number.isFinite(opts.limit)) {
const limit = Math.max(1, Math.floor(opts.limit));
sessions = sessions.slice(0, limit);
}
return {
ts: now,
path: storePath,
count: sessions.length,
defaults: getSessionDefaults(cfg),
sessions,
};
}
function logWs(
direction: "in" | "out",
kind: string,
@@ -1508,6 +1639,128 @@ export async function startGatewayServer(
respond(true, status, undefined);
break;
}
case "sessions.list": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateSessionsListParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`,
),
);
break;
}
const p = params as SessionsListParams;
const cfg = loadConfig();
const storePath = resolveStorePath(
cfg.inbound?.reply?.session?.store,
);
const store = loadSessionStore(storePath);
const result = listSessionsFromStore({
cfg,
storePath,
store,
opts: p,
});
respond(true, result, undefined);
break;
}
case "sessions.patch": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateSessionsPatchParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`,
),
);
break;
}
const p = params as SessionsPatchParams;
const key = String(p.key ?? "").trim();
if (!key) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "key required"),
);
break;
}
const cfg = loadConfig();
const storePath = resolveStorePath(
cfg.inbound?.reply?.session?.store,
);
const store = loadSessionStore(storePath);
const now = Date.now();
const existing = store[key];
const next: SessionEntry = existing
? {
...existing,
updatedAt: Math.max(existing.updatedAt ?? 0, now),
}
: { sessionId: randomUUID(), updatedAt: now };
if ("thinkingLevel" in p) {
const raw = p.thinkingLevel;
if (raw === null) {
delete next.thinkingLevel;
} else if (raw !== undefined) {
const normalized = normalizeThinkLevel(String(raw));
if (!normalized) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"invalid thinkingLevel (use off|minimal|low|medium|high)",
),
);
break;
}
if (normalized === "off") delete next.thinkingLevel;
else next.thinkingLevel = normalized;
}
}
if ("verboseLevel" in p) {
const raw = p.verboseLevel;
if (raw === null) {
delete next.verboseLevel;
} else if (raw !== undefined) {
const normalized = normalizeVerboseLevel(String(raw));
if (!normalized) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
'invalid verboseLevel (use "on"|"off")',
),
);
break;
}
if (normalized === "off") delete next.verboseLevel;
else next.verboseLevel = normalized;
}
}
store[key] = next;
await saveSessionStore(storePath, store);
const result: SessionsPatchResult = {
ok: true,
path: storePath,
key,
entry: next,
};
respond(true, result, undefined);
break;
}
case "last-heartbeat": {
respond(true, getLastHeartbeatEvent(), undefined);
break;