refactor: split gateway server methods

This commit is contained in:
Peter Steinberger
2026-01-04 04:05:18 +01:00
parent 3ebee63cb3
commit 3c4c2aa98c
20 changed files with 3211 additions and 3211 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,135 @@
import { onAgentEvent } from "../../infra/agent-events.js";
const AGENT_JOB_CACHE_TTL_MS = 10 * 60_000;
const agentJobCache = new Map<string, AgentJobSnapshot>();
const agentRunStarts = new Map<string, number>();
let agentJobListenerStarted = false;
type AgentJobSnapshot = {
runId: string;
state: "done" | "error";
startedAt?: number;
endedAt?: number;
error?: string;
ts: number;
};
function pruneAgentJobCache(now = Date.now()) {
for (const [runId, entry] of agentJobCache) {
if (now - entry.ts > AGENT_JOB_CACHE_TTL_MS) {
agentJobCache.delete(runId);
}
}
}
function recordAgentJobSnapshot(entry: AgentJobSnapshot) {
pruneAgentJobCache(entry.ts);
agentJobCache.set(entry.runId, entry);
}
function ensureAgentJobListener() {
if (agentJobListenerStarted) return;
agentJobListenerStarted = true;
onAgentEvent((evt) => {
if (!evt) return;
if (evt.stream !== "job") return;
const state = evt.data?.state;
if (state === "started") {
const startedAt =
typeof evt.data?.startedAt === "number"
? (evt.data.startedAt as number)
: undefined;
if (startedAt !== undefined) {
agentRunStarts.set(evt.runId, startedAt);
}
return;
}
if (state !== "done" && state !== "error") return;
const startedAt =
typeof evt.data?.startedAt === "number"
? (evt.data.startedAt as number)
: agentRunStarts.get(evt.runId);
const endedAt =
typeof evt.data?.endedAt === "number"
? (evt.data.endedAt as number)
: undefined;
const error =
typeof evt.data?.error === "string" ? (evt.data.error as string) : undefined;
agentRunStarts.delete(evt.runId);
recordAgentJobSnapshot({
runId: evt.runId,
state: state === "error" ? "error" : "done",
startedAt,
endedAt,
error,
ts: Date.now(),
});
});
}
function matchesAfterMs(entry: AgentJobSnapshot, afterMs?: number) {
if (afterMs === undefined) return true;
if (typeof entry.startedAt === "number") return entry.startedAt >= afterMs;
if (typeof entry.endedAt === "number") return entry.endedAt >= afterMs;
return false;
}
function getCachedAgentJob(runId: string, afterMs?: number) {
pruneAgentJobCache();
const cached = agentJobCache.get(runId);
if (!cached) return undefined;
return matchesAfterMs(cached, afterMs) ? cached : undefined;
}
export async function waitForAgentJob(params: {
runId: string;
afterMs?: number;
timeoutMs: number;
}): Promise<AgentJobSnapshot | null> {
const { runId, afterMs, timeoutMs } = params;
ensureAgentJobListener();
const cached = getCachedAgentJob(runId, afterMs);
if (cached) return cached;
if (timeoutMs <= 0) return null;
return await new Promise((resolve) => {
let settled = false;
const finish = (entry: AgentJobSnapshot | null) => {
if (settled) return;
settled = true;
clearTimeout(timer);
unsubscribe();
resolve(entry);
};
const unsubscribe = onAgentEvent((evt) => {
if (!evt || evt.stream !== "job") return;
if (evt.runId !== runId) return;
const state = evt.data?.state;
if (state !== "done" && state !== "error") return;
const startedAt =
typeof evt.data?.startedAt === "number"
? (evt.data.startedAt as number)
: agentRunStarts.get(evt.runId);
const endedAt =
typeof evt.data?.endedAt === "number"
? (evt.data.endedAt as number)
: undefined;
const error =
typeof evt.data?.error === "string" ? (evt.data.error as string) : undefined;
const snapshot: AgentJobSnapshot = {
runId: evt.runId,
state: state === "error" ? "error" : "done",
startedAt,
endedAt,
error,
ts: Date.now(),
};
recordAgentJobSnapshot(snapshot);
if (!matchesAfterMs(snapshot, afterMs)) return;
finish(snapshot);
});
const timer = setTimeout(() => finish(null), Math.max(1, timeoutMs));
});
}
ensureAgentJobListener();

View File

@@ -0,0 +1,313 @@
import { randomUUID } from "node:crypto";
import { agentCommand } from "../../commands/agent.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { defaultRuntime } from "../../runtime.js";
import { normalizeE164 } from "../../utils.js";
import { loadConfig } from "../../config/config.js";
import { saveSessionStore, type SessionEntry } from "../../config/sessions.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateAgentParams,
validateAgentWaitParams,
type AgentWaitParams,
} from "../protocol/index.js";
import { formatForLog } from "../ws-log.js";
import { loadSessionEntry } from "../session-utils.js";
import { waitForAgentJob } from "./agent-job.js";
import type { GatewayRequestHandlers } from "./types.js";
export const agentHandlers: GatewayRequestHandlers = {
agent: async ({ params, respond, context }) => {
const p = params as Record<string, unknown>;
if (!validateAgentParams(p)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid agent params: ${formatValidationErrors(validateAgentParams.errors)}`,
),
);
return;
}
const request = p as {
message: string;
to?: string;
sessionId?: string;
sessionKey?: string;
thinking?: string;
deliver?: boolean;
channel?: string;
lane?: string;
extraSystemPrompt?: string;
idempotencyKey: string;
timeout?: number;
};
const idem = request.idempotencyKey;
const cached = context.dedupe.get(`agent:${idem}`);
if (cached) {
respond(cached.ok, cached.payload, cached.error, {
cached: true,
});
return;
}
const message = request.message.trim();
const requestedSessionKey =
typeof request.sessionKey === "string" && request.sessionKey.trim()
? request.sessionKey.trim()
: undefined;
let resolvedSessionId = request.sessionId?.trim() || undefined;
let sessionEntry: SessionEntry | undefined;
let bestEffortDeliver = false;
let cfgForAgent: ReturnType<typeof loadConfig> | undefined;
if (requestedSessionKey) {
const { cfg, storePath, store, entry } = loadSessionEntry(requestedSessionKey);
cfgForAgent = cfg;
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
sessionEntry = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
skillsSnapshot: entry?.skillsSnapshot,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
};
const sendPolicy = resolveSendPolicy({
cfg,
entry,
sessionKey: requestedSessionKey,
surface: entry?.surface,
chatType: entry?.chatType,
});
if (sendPolicy === "deny") {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"send blocked by session policy",
),
);
return;
}
if (store) {
store[requestedSessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, store);
}
}
resolvedSessionId = sessionId;
const mainKey = (cfg.session?.mainKey ?? "main").trim() || "main";
if (requestedSessionKey === mainKey) {
context.addChatRun(idem, {
sessionKey: requestedSessionKey,
clientRunId: idem,
});
bestEffortDeliver = true;
}
registerAgentRunContext(idem, { sessionKey: requestedSessionKey });
}
const runId = idem;
const requestedChannelRaw =
typeof request.channel === "string" ? request.channel.trim() : "";
const requestedChannelNormalized = requestedChannelRaw
? requestedChannelRaw.toLowerCase()
: "last";
const requestedChannel =
requestedChannelNormalized === "imsg"
? "imessage"
: requestedChannelNormalized;
const lastChannel = sessionEntry?.lastChannel;
const lastTo =
typeof sessionEntry?.lastTo === "string" ? sessionEntry.lastTo.trim() : "";
const resolvedChannel = (() => {
if (requestedChannel === "last") {
// WebChat is not a deliverable surface. Treat it as "unset" for routing,
// so VoiceWake and CLI callers don't get stuck with deliver=false.
return lastChannel && lastChannel !== "webchat" ? lastChannel : "whatsapp";
}
if (
requestedChannel === "whatsapp" ||
requestedChannel === "telegram" ||
requestedChannel === "discord" ||
requestedChannel === "signal" ||
requestedChannel === "imessage" ||
requestedChannel === "webchat"
) {
return requestedChannel;
}
return lastChannel && lastChannel !== "webchat" ? lastChannel : "whatsapp";
})();
const resolvedTo = (() => {
const explicit =
typeof request.to === "string" && request.to.trim()
? request.to.trim()
: undefined;
if (explicit) return explicit;
if (
resolvedChannel === "whatsapp" ||
resolvedChannel === "telegram" ||
resolvedChannel === "discord" ||
resolvedChannel === "signal" ||
resolvedChannel === "imessage"
) {
return lastTo || undefined;
}
return undefined;
})();
const sanitizedTo = (() => {
// If we derived a WhatsApp recipient from session "lastTo", ensure it is still valid
// for the configured allowlist. Otherwise, fall back to the first allowed number so
// voice wake doesn't silently route to stale/test recipients.
if (resolvedChannel !== "whatsapp") return resolvedTo;
const explicit =
typeof request.to === "string" && request.to.trim()
? request.to.trim()
: undefined;
if (explicit) return resolvedTo;
const cfg = cfgForAgent ?? loadConfig();
const rawAllow = cfg.whatsapp?.allowFrom ?? [];
if (rawAllow.includes("*")) return resolvedTo;
const allowFrom = rawAllow
.map((val) => normalizeE164(val))
.filter((val) => val.length > 1);
if (allowFrom.length === 0) return resolvedTo;
const normalizedLast =
typeof resolvedTo === "string" && resolvedTo.trim()
? normalizeE164(resolvedTo)
: undefined;
if (normalizedLast && allowFrom.includes(normalizedLast)) {
return normalizedLast;
}
return allowFrom[0];
})();
const deliver = request.deliver === true && resolvedChannel !== "webchat";
const accepted = {
runId,
status: "accepted" as const,
acceptedAt: Date.now(),
};
// Store an in-flight ack so retries do not spawn a second run.
context.dedupe.set(`agent:${idem}`, {
ts: Date.now(),
ok: true,
payload: accepted,
});
respond(true, accepted, undefined, { runId });
void agentCommand(
{
message,
to: sanitizedTo,
sessionId: resolvedSessionId,
thinking: request.thinking,
deliver,
provider: resolvedChannel,
timeout: request.timeout?.toString(),
bestEffortDeliver,
surface: "VoiceWake",
runId,
lane: request.lane,
extraSystemPrompt: request.extraSystemPrompt,
},
defaultRuntime,
context.deps,
)
.then(() => {
const payload = {
runId,
status: "ok" as const,
summary: "completed",
};
context.dedupe.set(`agent:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
// Send a second res frame (same id) so TS clients with expectFinal can wait.
// Swift clients will typically treat the first res as the result and ignore this.
respond(true, payload, undefined, { runId });
})
.catch((err) => {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = {
runId,
status: "error" as const,
summary: String(err),
};
context.dedupe.set(`agent:${idem}`, {
ts: Date.now(),
ok: false,
payload,
error,
});
respond(false, payload, error, {
runId,
error: formatForLog(err),
});
});
},
"agent.wait": async ({ params, respond }) => {
if (!validateAgentWaitParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid agent.wait params: ${formatValidationErrors(validateAgentWaitParams.errors)}`,
),
);
return;
}
const p = params as AgentWaitParams;
const runId = p.runId.trim();
const afterMs =
typeof p.afterMs === "number" && Number.isFinite(p.afterMs)
? Math.max(0, Math.floor(p.afterMs))
: undefined;
const timeoutMs =
typeof p.timeoutMs === "number" && Number.isFinite(p.timeoutMs)
? Math.max(0, Math.floor(p.timeoutMs))
: 30_000;
const snapshot = await waitForAgentJob({
runId,
afterMs,
timeoutMs,
});
if (!snapshot) {
respond(true, {
runId,
status: "timeout",
});
return;
}
respond(true, {
runId,
status: snapshot.state === "done" ? "ok" : "error",
startedAt: snapshot.startedAt,
endedAt: snapshot.endedAt,
error: snapshot.error,
});
},
};

View File

@@ -0,0 +1,299 @@
import { randomUUID } from "node:crypto";
import { resolveThinkingDefault } from "../../agents/model-selection.js";
import { agentCommand } from "../../commands/agent.js";
import { saveSessionStore, type SessionEntry } from "../../config/sessions.js";
import { defaultRuntime } from "../../runtime.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateChatAbortParams,
validateChatHistoryParams,
validateChatSendParams,
} from "../protocol/index.js";
import { MAX_CHAT_HISTORY_MESSAGES_BYTES } from "../server-constants.js";
import {
capArrayByJsonBytes,
loadSessionEntry,
readSessionMessages,
resolveSessionModelRef,
} from "../session-utils.js";
import { formatForLog } from "../ws-log.js";
import { buildMessageWithAttachments } from "../chat-attachments.js";
import type { GatewayRequestHandlers } from "./types.js";
export const chatHandlers: GatewayRequestHandlers = {
"chat.history": async ({ params, respond, context }) => {
if (!validateChatHistoryParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`,
),
);
return;
}
const { sessionKey, limit } = params as {
sessionKey: string;
limit?: number;
};
const { cfg, storePath, entry } = loadSessionEntry(sessionKey);
const sessionId = entry?.sessionId;
const rawMessages =
sessionId && storePath ? readSessionMessages(sessionId, storePath) : [];
const hardMax = 1000;
const defaultLimit = 200;
const requested = typeof limit === "number" ? limit : defaultLimit;
const max = Math.min(hardMax, requested);
const sliced = rawMessages.length > max ? rawMessages.slice(-max) : rawMessages;
const capped = capArrayByJsonBytes(
sliced,
MAX_CHAT_HISTORY_MESSAGES_BYTES,
).items;
let thinkingLevel = entry?.thinkingLevel;
if (!thinkingLevel) {
const configured = cfg.agent?.thinkingDefault;
if (configured) {
thinkingLevel = configured;
} else {
const { provider, model } = resolveSessionModelRef(cfg, entry);
const catalog = await context.loadGatewayModelCatalog();
thinkingLevel = resolveThinkingDefault({
cfg,
provider,
model,
catalog,
});
}
}
respond(true, {
sessionKey,
sessionId,
messages: capped,
thinkingLevel,
});
},
"chat.abort": ({ params, respond, context }) => {
if (!validateChatAbortParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid chat.abort params: ${formatValidationErrors(validateChatAbortParams.errors)}`,
),
);
return;
}
const { sessionKey, runId } = params as {
sessionKey: string;
runId: string;
};
const active = context.chatAbortControllers.get(runId);
if (!active) {
respond(true, { ok: true, aborted: false });
return;
}
if (active.sessionKey !== sessionKey) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "runId does not match sessionKey"),
);
return;
}
active.controller.abort();
context.chatAbortControllers.delete(runId);
context.chatRunBuffers.delete(runId);
context.chatDeltaSentAt.delete(runId);
context.removeChatRun(active.sessionId, runId, sessionKey);
const payload = {
runId,
sessionKey,
seq: (context.agentRunSeq.get(active.sessionId) ?? 0) + 1,
state: "aborted" as const,
};
context.broadcast("chat", payload);
context.bridgeSendToSession(sessionKey, "chat", payload);
respond(true, { ok: true, aborted: true });
},
"chat.send": async ({ params, respond, context, client, isWebchatConnect }) => {
if (client && isWebchatConnect(client.connect) && !context.hasConnectedMobileNode()) {
respond(
false,
undefined,
errorShape(
ErrorCodes.UNAVAILABLE,
"web chat disabled: no connected iOS/Android nodes",
),
);
return;
}
if (!validateChatSendParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`,
),
);
return;
}
const p = params as {
sessionKey: string;
message: string;
thinking?: string;
deliver?: boolean;
attachments?: Array<{
type?: string;
mimeType?: string;
fileName?: string;
content?: unknown;
}>;
timeoutMs?: number;
idempotencyKey: string;
};
const timeoutMs = Math.min(Math.max(p.timeoutMs ?? 30_000, 0), 30_000);
const normalizedAttachments =
p.attachments?.map((a) => ({
type: typeof a?.type === "string" ? a.type : undefined,
mimeType: typeof a?.mimeType === "string" ? a.mimeType : undefined,
fileName: typeof a?.fileName === "string" ? a.fileName : undefined,
content:
typeof a?.content === "string"
? a.content
: ArrayBuffer.isView(a?.content)
? Buffer.from(
a.content.buffer,
a.content.byteOffset,
a.content.byteLength,
).toString("base64")
: undefined,
})) ?? [];
let messageWithAttachments = p.message;
if (normalizedAttachments.length > 0) {
try {
messageWithAttachments = buildMessageWithAttachments(
p.message,
normalizedAttachments,
{ maxBytes: 5_000_000 },
);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, String(err)),
);
return;
}
}
const { cfg, storePath, store, entry } = loadSessionEntry(p.sessionKey);
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
};
const clientRunId = p.idempotencyKey;
const sendPolicy = resolveSendPolicy({
cfg,
entry,
sessionKey: p.sessionKey,
surface: entry?.surface,
chatType: entry?.chatType,
});
if (sendPolicy === "deny") {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "send blocked by session policy"),
);
return;
}
const cached = context.dedupe.get(`chat:${clientRunId}`);
if (cached) {
respond(cached.ok, cached.payload, cached.error, {
cached: true,
});
return;
}
try {
const abortController = new AbortController();
context.chatAbortControllers.set(clientRunId, {
controller: abortController,
sessionId,
sessionKey: p.sessionKey,
});
context.addChatRun(sessionId, {
sessionKey: p.sessionKey,
clientRunId,
});
if (store) {
store[p.sessionKey] = sessionEntry;
if (storePath) {
await saveSessionStore(storePath, store);
}
}
await agentCommand(
{
message: messageWithAttachments,
sessionId,
thinking: p.thinking,
deliver: p.deliver,
timeout: Math.ceil(timeoutMs / 1000).toString(),
surface: "WebChat",
abortSignal: abortController.signal,
},
defaultRuntime,
context.deps,
);
const payload = {
runId: clientRunId,
status: "ok" as const,
};
context.dedupe.set(`chat:${clientRunId}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { runId: clientRunId });
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = {
runId: clientRunId,
status: "error" as const,
summary: String(err),
};
context.dedupe.set(`chat:${clientRunId}`, {
ts: Date.now(),
ok: false,
payload,
error,
});
respond(false, payload, error, {
runId: clientRunId,
error: formatForLog(err),
});
} finally {
context.chatAbortControllers.delete(clientRunId);
}
},
};

View File

@@ -0,0 +1,105 @@
import {
CONFIG_PATH_CLAWDIS,
parseConfigJson5,
readConfigFileSnapshot,
validateConfigObject,
writeConfigFile,
} from "../../config/config.js";
import { buildConfigSchema } from "../../config/schema.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateConfigGetParams,
validateConfigSchemaParams,
validateConfigSetParams,
} from "../protocol/index.js";
import type { GatewayRequestHandlers } from "./types.js";
export const configHandlers: GatewayRequestHandlers = {
"config.get": async ({ params, respond }) => {
if (!validateConfigGetParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid config.get params: ${formatValidationErrors(validateConfigGetParams.errors)}`,
),
);
return;
}
const snapshot = await readConfigFileSnapshot();
respond(true, snapshot, undefined);
},
"config.schema": ({ params, respond }) => {
if (!validateConfigSchemaParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid config.schema params: ${formatValidationErrors(validateConfigSchemaParams.errors)}`,
),
);
return;
}
const schema = buildConfigSchema();
respond(true, schema, undefined);
},
"config.set": async ({ params, respond }) => {
if (!validateConfigSetParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid config.set params: ${formatValidationErrors(validateConfigSetParams.errors)}`,
),
);
return;
}
const rawValue = (params as { raw?: unknown }).raw;
if (typeof rawValue !== "string") {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"invalid config.set params: raw (string) required",
),
);
return;
}
const parsedRes = parseConfigJson5(rawValue);
if (!parsedRes.ok) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, parsedRes.error),
);
return;
}
const validated = validateConfigObject(parsedRes.parsed);
if (!validated.ok) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "invalid config", {
details: { issues: validated.issues },
}),
);
return;
}
await writeConfigFile(validated.config);
respond(
true,
{
ok: true,
path: CONFIG_PATH_CLAWDIS,
config: validated.config,
},
undefined,
);
},
};

View File

@@ -0,0 +1,15 @@
import { ErrorCodes, errorShape } from "../protocol/index.js";
import type { GatewayRequestHandlers } from "./types.js";
export const connectHandlers: GatewayRequestHandlers = {
connect: ({ respond }) => {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"connect is only valid as the first request",
),
);
},
};

View File

@@ -0,0 +1,163 @@
import type { CronJobCreate, CronJobPatch } from "../../cron/types.js";
import {
readCronRunLogEntries,
resolveCronRunLogPath,
} from "../../cron/run-log.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateCronAddParams,
validateCronListParams,
validateCronRemoveParams,
validateCronRunParams,
validateCronRunsParams,
validateCronStatusParams,
validateCronUpdateParams,
validateWakeParams,
} from "../protocol/index.js";
import type { GatewayRequestHandlers } from "./types.js";
export const cronHandlers: GatewayRequestHandlers = {
wake: ({ params, respond, context }) => {
if (!validateWakeParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid wake params: ${formatValidationErrors(validateWakeParams.errors)}`,
),
);
return;
}
const p = params as {
mode: "now" | "next-heartbeat";
text: string;
};
const result = context.cron.wake({ mode: p.mode, text: p.text });
respond(true, result, undefined);
},
"cron.list": async ({ params, respond, context }) => {
if (!validateCronListParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.list params: ${formatValidationErrors(validateCronListParams.errors)}`,
),
);
return;
}
const p = params as { includeDisabled?: boolean };
const jobs = await context.cron.list({
includeDisabled: p.includeDisabled,
});
respond(true, { jobs }, undefined);
},
"cron.status": async ({ params, respond, context }) => {
if (!validateCronStatusParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.status params: ${formatValidationErrors(validateCronStatusParams.errors)}`,
),
);
return;
}
const status = await context.cron.status();
respond(true, status, undefined);
},
"cron.add": async ({ params, respond, context }) => {
if (!validateCronAddParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.add params: ${formatValidationErrors(validateCronAddParams.errors)}`,
),
);
return;
}
const job = await context.cron.add(params as unknown as CronJobCreate);
respond(true, job, undefined);
},
"cron.update": async ({ params, respond, context }) => {
if (!validateCronUpdateParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.update params: ${formatValidationErrors(validateCronUpdateParams.errors)}`,
),
);
return;
}
const p = params as {
id: string;
patch: Record<string, unknown>;
};
const job = await context.cron.update(p.id, p.patch as unknown as CronJobPatch);
respond(true, job, undefined);
},
"cron.remove": async ({ params, respond, context }) => {
if (!validateCronRemoveParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.remove params: ${formatValidationErrors(validateCronRemoveParams.errors)}`,
),
);
return;
}
const p = params as { id: string };
const result = await context.cron.remove(p.id);
respond(true, result, undefined);
},
"cron.run": async ({ params, respond, context }) => {
if (!validateCronRunParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.run params: ${formatValidationErrors(validateCronRunParams.errors)}`,
),
);
return;
}
const p = params as { id: string; mode?: "due" | "force" };
const result = await context.cron.run(p.id, p.mode);
respond(true, result, undefined);
},
"cron.runs": async ({ params, respond, context }) => {
if (!validateCronRunsParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.runs params: ${formatValidationErrors(validateCronRunsParams.errors)}`,
),
);
return;
}
const p = params as { id: string; limit?: number };
const logPath = resolveCronRunLogPath({
storePath: context.cronStorePath,
jobId: p.id,
});
const entries = await readCronRunLogEntries(logPath, {
limit: p.limit,
jobId: p.id,
});
respond(true, { entries }, undefined);
},
};

View File

@@ -0,0 +1,35 @@
import { getStatusSummary } from "../../commands/status.js";
import { ErrorCodes, errorShape } from "../protocol/index.js";
import { HEALTH_REFRESH_INTERVAL_MS } from "../server-constants.js";
import { formatError } from "../server-utils.js";
import { formatForLog } from "../ws-log.js";
import type { GatewayRequestHandlers } from "./types.js";
export const healthHandlers: GatewayRequestHandlers = {
health: async ({ respond, context }) => {
const { getHealthCache, refreshHealthSnapshot, logHealth } = context;
const now = Date.now();
const cached = getHealthCache();
if (cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) {
respond(true, cached, undefined, { cached: true });
void refreshHealthSnapshot({ probe: false }).catch((err) =>
logHealth.error(`background health refresh failed: ${formatError(err)}`),
);
return;
}
try {
const snap = await refreshHealthSnapshot({ probe: false });
respond(true, snap, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
status: async ({ respond }) => {
const status = await getStatusSummary();
respond(true, status, undefined);
},
};

View File

@@ -0,0 +1,29 @@
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateModelsListParams,
} from "../protocol/index.js";
import type { GatewayRequestHandlers } from "./types.js";
export const modelsHandlers: GatewayRequestHandlers = {
"models.list": async ({ params, respond, context }) => {
if (!validateModelsListParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid models.list params: ${formatValidationErrors(validateModelsListParams.errors)}`,
),
);
return;
}
try {
const models = await context.loadGatewayModelCatalog();
respond(true, { models }, undefined);
} catch (err) {
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, String(err)));
}
},
};

View File

@@ -0,0 +1,500 @@
import {
approveNodePairing,
listNodePairing,
rejectNodePairing,
renamePairedNode,
requestNodePairing,
verifyNodeToken,
} from "../../infra/node-pairing.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateNodeDescribeParams,
validateNodeInvokeParams,
validateNodeListParams,
validateNodePairApproveParams,
validateNodePairListParams,
validateNodePairRejectParams,
validateNodePairRequestParams,
validateNodePairVerifyParams,
validateNodeRenameParams,
} from "../protocol/index.js";
import { formatForLog } from "../ws-log.js";
import type { GatewayRequestHandlers } from "./types.js";
export const nodeHandlers: GatewayRequestHandlers = {
"node.pair.request": async ({ params, respond, context }) => {
if (!validateNodePairRequestParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.pair.request params: ${formatValidationErrors(validateNodePairRequestParams.errors)}`,
),
);
return;
}
const p = params as {
nodeId: string;
displayName?: string;
platform?: string;
version?: string;
deviceFamily?: string;
modelIdentifier?: string;
caps?: string[];
commands?: string[];
remoteIp?: string;
silent?: boolean;
};
try {
const result = await requestNodePairing({
nodeId: p.nodeId,
displayName: p.displayName,
platform: p.platform,
version: p.version,
deviceFamily: p.deviceFamily,
modelIdentifier: p.modelIdentifier,
caps: p.caps,
commands: p.commands,
remoteIp: p.remoteIp,
silent: p.silent,
});
if (result.status === "pending" && result.created) {
context.broadcast("node.pair.requested", result.request, {
dropIfSlow: true,
});
}
respond(true, result, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"node.pair.list": async ({ params, respond }) => {
if (!validateNodePairListParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.pair.list params: ${formatValidationErrors(validateNodePairListParams.errors)}`,
),
);
return;
}
try {
const list = await listNodePairing();
respond(true, list, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"node.pair.approve": async ({ params, respond, context }) => {
if (!validateNodePairApproveParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.pair.approve params: ${formatValidationErrors(validateNodePairApproveParams.errors)}`,
),
);
return;
}
const { requestId } = params as { requestId: string };
try {
const approved = await approveNodePairing(requestId);
if (!approved) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "unknown requestId"),
);
return;
}
context.broadcast(
"node.pair.resolved",
{
requestId,
nodeId: approved.node.nodeId,
decision: "approved",
ts: Date.now(),
},
{ dropIfSlow: true },
);
respond(true, approved, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"node.pair.reject": async ({ params, respond, context }) => {
if (!validateNodePairRejectParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.pair.reject params: ${formatValidationErrors(validateNodePairRejectParams.errors)}`,
),
);
return;
}
const { requestId } = params as { requestId: string };
try {
const rejected = await rejectNodePairing(requestId);
if (!rejected) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "unknown requestId"),
);
return;
}
context.broadcast(
"node.pair.resolved",
{
requestId,
nodeId: rejected.nodeId,
decision: "rejected",
ts: Date.now(),
},
{ dropIfSlow: true },
);
respond(true, rejected, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"node.pair.verify": async ({ params, respond }) => {
if (!validateNodePairVerifyParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.pair.verify params: ${formatValidationErrors(validateNodePairVerifyParams.errors)}`,
),
);
return;
}
const { nodeId, token } = params as {
nodeId: string;
token: string;
};
try {
const result = await verifyNodeToken(nodeId, token);
respond(true, result, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"node.rename": async ({ params, respond }) => {
if (!validateNodeRenameParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.rename params: ${formatValidationErrors(validateNodeRenameParams.errors)}`,
),
);
return;
}
const { nodeId, displayName } = params as {
nodeId: string;
displayName: string;
};
try {
const trimmed = displayName.trim();
if (!trimmed) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "displayName required"),
);
return;
}
const updated = await renamePairedNode(nodeId, trimmed);
if (!updated) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "unknown nodeId"),
);
return;
}
respond(true, { nodeId: updated.nodeId, displayName: updated.displayName }, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"node.list": async ({ params, respond, context }) => {
if (!validateNodeListParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.list params: ${formatValidationErrors(validateNodeListParams.errors)}`,
),
);
return;
}
try {
const list = await listNodePairing();
const pairedById = new Map(list.paired.map((n) => [n.nodeId, n]));
const connected = context.bridge?.listConnected?.() ?? [];
const connectedById = new Map(connected.map((n) => [n.nodeId, n]));
const nodeIds = new Set<string>([
...pairedById.keys(),
...connectedById.keys(),
]);
const nodes = [...nodeIds].map((nodeId) => {
const paired = pairedById.get(nodeId);
const live = connectedById.get(nodeId);
const caps = [
...new Set(
(live?.caps ?? paired?.caps ?? [])
.map((c) => String(c).trim())
.filter(Boolean),
),
].sort();
const commands = [
...new Set(
(live?.commands ?? paired?.commands ?? [])
.map((c) => String(c).trim())
.filter(Boolean),
),
].sort();
return {
nodeId,
displayName: live?.displayName ?? paired?.displayName,
platform: live?.platform ?? paired?.platform,
version: live?.version ?? paired?.version,
deviceFamily: live?.deviceFamily ?? paired?.deviceFamily,
modelIdentifier: live?.modelIdentifier ?? paired?.modelIdentifier,
remoteIp: live?.remoteIp ?? paired?.remoteIp,
caps,
commands,
permissions: live?.permissions ?? paired?.permissions,
paired: Boolean(paired),
connected: Boolean(live),
};
});
nodes.sort((a, b) => {
if (a.connected !== b.connected) return a.connected ? -1 : 1;
const an = (a.displayName ?? a.nodeId).toLowerCase();
const bn = (b.displayName ?? b.nodeId).toLowerCase();
if (an < bn) return -1;
if (an > bn) return 1;
return a.nodeId.localeCompare(b.nodeId);
});
respond(true, { ts: Date.now(), nodes }, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"node.describe": async ({ params, respond, context }) => {
if (!validateNodeDescribeParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.describe params: ${formatValidationErrors(validateNodeDescribeParams.errors)}`,
),
);
return;
}
const { nodeId } = params as { nodeId: string };
const id = String(nodeId ?? "").trim();
if (!id) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "nodeId required"),
);
return;
}
try {
const list = await listNodePairing();
const paired = list.paired.find((n) => n.nodeId === id);
const connected = context.bridge?.listConnected?.() ?? [];
const live = connected.find((n) => n.nodeId === id);
if (!paired && !live) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "unknown nodeId"),
);
return;
}
const caps = [
...new Set(
(live?.caps ?? paired?.caps ?? [])
.map((c) => String(c).trim())
.filter(Boolean),
),
].sort();
const commands = [
...new Set(
(live?.commands ?? paired?.commands ?? [])
.map((c) => String(c).trim())
.filter(Boolean),
),
].sort();
respond(
true,
{
ts: Date.now(),
nodeId: id,
displayName: live?.displayName ?? paired?.displayName,
platform: live?.platform ?? paired?.platform,
version: live?.version ?? paired?.version,
deviceFamily: live?.deviceFamily ?? paired?.deviceFamily,
modelIdentifier: live?.modelIdentifier ?? paired?.modelIdentifier,
remoteIp: live?.remoteIp ?? paired?.remoteIp,
caps,
commands,
permissions: live?.permissions ?? paired?.permissions,
paired: Boolean(paired),
connected: Boolean(live),
},
undefined,
);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"node.invoke": async ({ params, respond, context }) => {
if (!validateNodeInvokeParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.invoke params: ${formatValidationErrors(validateNodeInvokeParams.errors)}`,
),
);
return;
}
if (!context.bridge) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, "bridge not running"),
);
return;
}
const p = params as {
nodeId: string;
command: string;
params?: unknown;
timeoutMs?: number;
idempotencyKey: string;
};
const nodeId = String(p.nodeId ?? "").trim();
const command = String(p.command ?? "").trim();
if (!nodeId || !command) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "nodeId and command required"),
);
return;
}
try {
const paramsJSON =
"params" in p && p.params !== undefined ? JSON.stringify(p.params) : null;
const res = await context.bridge.invoke({
nodeId,
command,
paramsJSON,
timeoutMs: p.timeoutMs,
});
if (!res.ok) {
respond(
false,
undefined,
errorShape(
ErrorCodes.UNAVAILABLE,
res.error?.message ?? "node invoke failed",
{ details: { nodeError: res.error ?? null } },
),
);
return;
}
const payload =
typeof res.payloadJSON === "string" && res.payloadJSON.trim()
? (() => {
try {
return JSON.parse(res.payloadJSON) as unknown;
} catch {
return { payloadJSON: res.payloadJSON };
}
})()
: undefined;
respond(
true,
{
ok: true,
nodeId,
command,
payload,
payloadJSON: res.payloadJSON ?? null,
},
undefined,
);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
};

View File

@@ -0,0 +1,215 @@
import { type DiscordProbe, probeDiscord } from "../../discord/probe.js";
import { type IMessageProbe, probeIMessage } from "../../imessage/probe.js";
import type { ClawdisConfig } from "../../config/config.js";
import { loadConfig, readConfigFileSnapshot, writeConfigFile } from "../../config/config.js";
import { webAuthExists } from "../../providers/web/index.js";
import { getWebAuthAgeMs, readWebSelfId } from "../../web/session.js";
import { probeSignal, type SignalProbe } from "../../signal/probe.js";
import { probeTelegram, type TelegramProbe } from "../../telegram/probe.js";
import { resolveTelegramToken } from "../../telegram/token.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateProvidersStatusParams,
} from "../protocol/index.js";
import { formatForLog } from "../ws-log.js";
import type { GatewayRequestHandlers } from "./types.js";
export const providersHandlers: GatewayRequestHandlers = {
"providers.status": async ({ params, respond, context }) => {
if (!validateProvidersStatusParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid providers.status params: ${formatValidationErrors(validateProvidersStatusParams.errors)}`,
),
);
return;
}
const probe = (params as { probe?: boolean }).probe === true;
const timeoutMsRaw = (params as { timeoutMs?: unknown }).timeoutMs;
const timeoutMs =
typeof timeoutMsRaw === "number" ? Math.max(1000, timeoutMsRaw) : 10_000;
const cfg = loadConfig();
const telegramCfg = cfg.telegram;
const telegramEnabled = Boolean(telegramCfg) && telegramCfg?.enabled !== false;
const { token: telegramToken, source: tokenSource } = telegramEnabled
? resolveTelegramToken(cfg)
: { token: "", source: "none" as const };
let telegramProbe: TelegramProbe | undefined;
let lastProbeAt: number | null = null;
if (probe && telegramToken && telegramEnabled) {
telegramProbe = await probeTelegram(
telegramToken,
timeoutMs,
telegramCfg?.proxy,
);
lastProbeAt = Date.now();
}
const discordCfg = cfg.discord;
const discordEnabled = Boolean(discordCfg) && discordCfg?.enabled !== false;
const discordEnvToken = discordEnabled
? process.env.DISCORD_BOT_TOKEN?.trim()
: "";
const discordConfigToken = discordEnabled
? discordCfg?.token?.trim()
: "";
const discordToken = discordEnvToken || discordConfigToken || "";
const discordTokenSource = discordEnvToken
? "env"
: discordConfigToken
? "config"
: "none";
let discordProbe: DiscordProbe | undefined;
let discordLastProbeAt: number | null = null;
if (probe && discordToken && discordEnabled) {
discordProbe = await probeDiscord(discordToken, timeoutMs);
discordLastProbeAt = Date.now();
}
const signalCfg = cfg.signal;
const signalEnabled = signalCfg?.enabled !== false;
const signalHost = signalCfg?.httpHost?.trim() || "127.0.0.1";
const signalPort = signalCfg?.httpPort ?? 8080;
const signalBaseUrl =
signalCfg?.httpUrl?.trim() || `http://${signalHost}:${signalPort}`;
const signalConfigured =
Boolean(signalCfg) &&
signalEnabled &&
Boolean(
signalCfg?.account?.trim() ||
signalCfg?.httpUrl?.trim() ||
signalCfg?.cliPath?.trim() ||
signalCfg?.httpHost?.trim() ||
typeof signalCfg?.httpPort === "number" ||
typeof signalCfg?.autoStart === "boolean",
);
let signalProbe: SignalProbe | undefined;
let signalLastProbeAt: number | null = null;
if (probe && signalConfigured) {
signalProbe = await probeSignal(signalBaseUrl, timeoutMs);
signalLastProbeAt = Date.now();
}
const imessageCfg = cfg.imessage;
const imessageEnabled = imessageCfg?.enabled !== false;
const imessageConfigured = Boolean(imessageCfg) && imessageEnabled;
let imessageProbe: IMessageProbe | undefined;
let imessageLastProbeAt: number | null = null;
if (probe && imessageConfigured) {
imessageProbe = await probeIMessage(timeoutMs);
imessageLastProbeAt = Date.now();
}
const linked = await webAuthExists();
const authAgeMs = getWebAuthAgeMs();
const self = readWebSelfId();
const runtime = context.getRuntimeSnapshot();
respond(
true,
{
ts: Date.now(),
whatsapp: {
configured: linked,
linked,
authAgeMs,
self,
running: runtime.whatsapp.running,
connected: runtime.whatsapp.connected,
lastConnectedAt: runtime.whatsapp.lastConnectedAt ?? null,
lastDisconnect: runtime.whatsapp.lastDisconnect ?? null,
reconnectAttempts: runtime.whatsapp.reconnectAttempts,
lastMessageAt: runtime.whatsapp.lastMessageAt ?? null,
lastEventAt: runtime.whatsapp.lastEventAt ?? null,
lastError: runtime.whatsapp.lastError ?? null,
},
telegram: {
configured: telegramEnabled && Boolean(telegramToken),
tokenSource,
running: runtime.telegram.running,
mode: runtime.telegram.mode ?? null,
lastStartAt: runtime.telegram.lastStartAt ?? null,
lastStopAt: runtime.telegram.lastStopAt ?? null,
lastError: runtime.telegram.lastError ?? null,
probe: telegramProbe,
lastProbeAt,
},
discord: {
configured: discordEnabled && Boolean(discordToken),
tokenSource: discordTokenSource,
running: runtime.discord.running,
lastStartAt: runtime.discord.lastStartAt ?? null,
lastStopAt: runtime.discord.lastStopAt ?? null,
lastError: runtime.discord.lastError ?? null,
probe: discordProbe,
lastProbeAt: discordLastProbeAt,
},
signal: {
configured: signalConfigured,
baseUrl: signalBaseUrl,
running: runtime.signal.running,
lastStartAt: runtime.signal.lastStartAt ?? null,
lastStopAt: runtime.signal.lastStopAt ?? null,
lastError: runtime.signal.lastError ?? null,
probe: signalProbe,
lastProbeAt: signalLastProbeAt,
},
imessage: {
configured: imessageConfigured,
running: runtime.imessage.running,
lastStartAt: runtime.imessage.lastStartAt ?? null,
lastStopAt: runtime.imessage.lastStopAt ?? null,
lastError: runtime.imessage.lastError ?? null,
cliPath: runtime.imessage.cliPath ?? null,
dbPath: runtime.imessage.dbPath ?? null,
probe: imessageProbe,
lastProbeAt: imessageLastProbeAt,
},
},
undefined,
);
},
"telegram.logout": async ({ respond, context }) => {
try {
await context.stopTelegramProvider();
const snapshot = await readConfigFileSnapshot();
if (!snapshot.valid) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"config invalid; fix it before logging out",
),
);
return;
}
const cfg = snapshot.config ?? {};
const envToken = process.env.TELEGRAM_BOT_TOKEN?.trim() ?? "";
const hadToken = Boolean(cfg.telegram?.botToken);
const nextTelegram = cfg.telegram ? { ...cfg.telegram } : undefined;
if (nextTelegram) {
delete nextTelegram.botToken;
}
const nextCfg = { ...cfg } as ClawdisConfig;
if (nextTelegram && Object.keys(nextTelegram).length > 0) {
nextCfg.telegram = nextTelegram;
} else {
delete nextCfg.telegram;
}
await writeConfigFile(nextCfg);
respond(true, { cleared: hadToken, envToken: Boolean(envToken) }, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
};

View File

@@ -0,0 +1,164 @@
import { sendMessageDiscord } from "../../discord/index.js";
import { shouldLogVerbose } from "../../globals.js";
import { sendMessageIMessage } from "../../imessage/index.js";
import { loadConfig } from "../../config/config.js";
import { sendMessageSignal } from "../../signal/index.js";
import { sendMessageTelegram } from "../../telegram/send.js";
import { resolveTelegramToken } from "../../telegram/token.js";
import { sendMessageWhatsApp } from "../../web/outbound.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateSendParams,
} from "../protocol/index.js";
import { formatForLog } from "../ws-log.js";
import type { GatewayRequestHandlers } from "./types.js";
export const sendHandlers: GatewayRequestHandlers = {
send: async ({ params, respond, context }) => {
const p = params as Record<string, unknown>;
if (!validateSendParams(p)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid send params: ${formatValidationErrors(validateSendParams.errors)}`,
),
);
return;
}
const request = p as {
to: string;
message: string;
mediaUrl?: string;
gifPlayback?: boolean;
provider?: string;
idempotencyKey: string;
};
const idem = request.idempotencyKey;
const cached = context.dedupe.get(`send:${idem}`);
if (cached) {
respond(cached.ok, cached.payload, cached.error, {
cached: true,
});
return;
}
const to = request.to.trim();
const message = request.message.trim();
const providerRaw = (request.provider ?? "whatsapp").toLowerCase();
const provider = providerRaw === "imsg" ? "imessage" : providerRaw;
try {
if (provider === "telegram") {
const cfg = loadConfig();
const { token } = resolveTelegramToken(cfg);
const result = await sendMessageTelegram(to, message, {
mediaUrl: request.mediaUrl,
verbose: shouldLogVerbose(),
token: token || undefined,
});
const payload = {
runId: idem,
messageId: result.messageId,
chatId: result.chatId,
provider,
};
context.dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { provider });
} else if (provider === "discord") {
const result = await sendMessageDiscord(to, message, {
mediaUrl: request.mediaUrl,
token: process.env.DISCORD_BOT_TOKEN,
});
const payload = {
runId: idem,
messageId: result.messageId,
channelId: result.channelId,
provider,
};
context.dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { provider });
} else if (provider === "signal") {
const cfg = loadConfig();
const host = cfg.signal?.httpHost?.trim() || "127.0.0.1";
const port = cfg.signal?.httpPort ?? 8080;
const baseUrl = cfg.signal?.httpUrl?.trim() || `http://${host}:${port}`;
const result = await sendMessageSignal(to, message, {
mediaUrl: request.mediaUrl,
baseUrl,
account: cfg.signal?.account,
});
const payload = {
runId: idem,
messageId: result.messageId,
provider,
};
context.dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { provider });
} else if (provider === "imessage") {
const cfg = loadConfig();
const result = await sendMessageIMessage(to, message, {
mediaUrl: request.mediaUrl,
cliPath: cfg.imessage?.cliPath,
dbPath: cfg.imessage?.dbPath,
maxBytes: cfg.imessage?.mediaMaxMb
? cfg.imessage.mediaMaxMb * 1024 * 1024
: undefined,
});
const payload = {
runId: idem,
messageId: result.messageId,
provider,
};
context.dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { provider });
} else {
const result = await sendMessageWhatsApp(to, message, {
mediaUrl: request.mediaUrl,
verbose: shouldLogVerbose(),
gifPlayback: request.gifPlayback,
});
const payload = {
runId: idem,
messageId: result.messageId,
toJid: result.toJid ?? `${to}@s.whatsapp.net`,
provider,
};
context.dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { provider });
}
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
context.dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: false,
error,
});
respond(false, undefined, error, {
provider,
error: formatForLog(err),
});
}
},
};

View File

@@ -0,0 +1,469 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../../agents/defaults.js";
import {
buildAllowedModelSet,
buildModelAliasIndex,
modelKey,
resolveConfiguredModelRef,
resolveModelRefFromString,
} from "../../agents/model-selection.js";
import {
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
resolveEmbeddedSessionLane,
waitForEmbeddedPiRunEnd,
} from "../../agents/pi-embedded.js";
import { normalizeGroupActivation } from "../../auto-reply/group-activation.js";
import {
normalizeThinkLevel,
normalizeVerboseLevel,
} from "../../auto-reply/thinking.js";
import { loadConfig } from "../../config/config.js";
import {
loadSessionStore,
resolveMainSessionKey,
resolveStorePath,
type SessionEntry,
saveSessionStore,
} from "../../config/sessions.js";
import { clearCommandLane } from "../../process/command-queue.js";
import { normalizeSendPolicy } from "../../sessions/send-policy.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateSessionsCompactParams,
validateSessionsDeleteParams,
validateSessionsListParams,
validateSessionsPatchParams,
validateSessionsResetParams,
} from "../protocol/index.js";
import {
archiveFileOnDisk,
listSessionsFromStore,
loadSessionEntry,
resolveSessionTranscriptCandidates,
type SessionsPatchResult,
} from "../session-utils.js";
import type { GatewayRequestHandlers } from "./types.js";
export const sessionsHandlers: GatewayRequestHandlers = {
"sessions.list": ({ params, respond }) => {
if (!validateSessionsListParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.list params: ${formatValidationErrors(validateSessionsListParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsListParams;
const cfg = loadConfig();
const storePath = resolveStorePath(cfg.session?.store);
const store = loadSessionStore(storePath);
const result = listSessionsFromStore({
cfg,
storePath,
store,
opts: p,
});
respond(true, result, undefined);
},
"sessions.patch": async ({ params, respond, context }) => {
if (!validateSessionsPatchParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.patch params: ${formatValidationErrors(validateSessionsPatchParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsPatchParams;
const key = String(p.key ?? "").trim();
if (!key) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "key required"),
);
return;
}
const cfg = loadConfig();
const storePath = resolveStorePath(cfg.session?.store);
const store = loadSessionStore(storePath);
const now = Date.now();
const existing = store[key];
const next: SessionEntry = existing
? {
...existing,
updatedAt: Math.max(existing.updatedAt ?? 0, now),
}
: { sessionId: randomUUID(), updatedAt: now };
if ("thinkingLevel" in p) {
const raw = p.thinkingLevel;
if (raw === null) {
delete next.thinkingLevel;
} else if (raw !== undefined) {
const normalized = normalizeThinkLevel(String(raw));
if (!normalized) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"invalid thinkingLevel (use off|minimal|low|medium|high)",
),
);
return;
}
if (normalized === "off") delete next.thinkingLevel;
else next.thinkingLevel = normalized;
}
}
if ("verboseLevel" in p) {
const raw = p.verboseLevel;
if (raw === null) {
delete next.verboseLevel;
} else if (raw !== undefined) {
const normalized = normalizeVerboseLevel(String(raw));
if (!normalized) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
'invalid verboseLevel (use "on"|"off")',
),
);
return;
}
if (normalized === "off") delete next.verboseLevel;
else next.verboseLevel = normalized;
}
}
if ("model" in p) {
const raw = p.model;
if (raw === null) {
delete next.providerOverride;
delete next.modelOverride;
} else if (raw !== undefined) {
const trimmed = String(raw).trim();
if (!trimmed) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "invalid model: empty"),
);
return;
}
const resolvedDefault = resolveConfiguredModelRef({
cfg,
defaultProvider: DEFAULT_PROVIDER,
defaultModel: DEFAULT_MODEL,
});
const aliasIndex = buildModelAliasIndex({
cfg,
defaultProvider: resolvedDefault.provider,
});
const resolved = resolveModelRefFromString({
raw: trimmed,
defaultProvider: resolvedDefault.provider,
aliasIndex,
});
if (!resolved) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `invalid model: ${trimmed}`),
);
return;
}
const catalog = await context.loadGatewayModelCatalog();
const allowed = buildAllowedModelSet({
cfg,
catalog,
defaultProvider: resolvedDefault.provider,
});
const key = modelKey(resolved.ref.provider, resolved.ref.model);
if (!allowed.allowAny && !allowed.allowedKeys.has(key)) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `model not allowed: ${key}`),
);
return;
}
if (
resolved.ref.provider === resolvedDefault.provider &&
resolved.ref.model === resolvedDefault.model
) {
delete next.providerOverride;
delete next.modelOverride;
} else {
next.providerOverride = resolved.ref.provider;
next.modelOverride = resolved.ref.model;
}
}
}
if ("sendPolicy" in p) {
const raw = p.sendPolicy;
if (raw === null) {
delete next.sendPolicy;
} else if (raw !== undefined) {
const normalized = normalizeSendPolicy(String(raw));
if (!normalized) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
'invalid sendPolicy (use "allow"|"deny")',
),
);
return;
}
next.sendPolicy = normalized;
}
}
if ("groupActivation" in p) {
const raw = p.groupActivation;
if (raw === null) {
delete next.groupActivation;
} else if (raw !== undefined) {
const normalized = normalizeGroupActivation(String(raw));
if (!normalized) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
'invalid groupActivation (use "mention"|"always")',
),
);
return;
}
next.groupActivation = normalized;
}
}
store[key] = next;
await saveSessionStore(storePath, store);
const result: SessionsPatchResult = {
ok: true,
path: storePath,
key,
entry: next,
};
respond(true, result, undefined);
},
"sessions.reset": async ({ params, respond }) => {
if (!validateSessionsResetParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.reset params: ${formatValidationErrors(validateSessionsResetParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsResetParams;
const key = String(p.key ?? "").trim();
if (!key) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "key required"),
);
return;
}
const { storePath, store, entry } = loadSessionEntry(key);
const now = Date.now();
const next: SessionEntry = {
sessionId: randomUUID(),
updatedAt: now,
systemSent: false,
abortedLastRun: false,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
model: entry?.model,
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
};
store[key] = next;
await saveSessionStore(storePath, store);
respond(true, { ok: true, key, entry: next }, undefined);
},
"sessions.delete": async ({ params, respond }) => {
if (!validateSessionsDeleteParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.delete params: ${formatValidationErrors(validateSessionsDeleteParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsDeleteParams;
const key = String(p.key ?? "").trim();
if (!key) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "key required"),
);
return;
}
const mainKey = resolveMainSessionKey(loadConfig());
if (key === mainKey) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`Cannot delete the main session (${mainKey}).`,
),
);
return;
}
const deleteTranscript =
typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true;
const { storePath, store, entry } = loadSessionEntry(key);
const sessionId = entry?.sessionId;
const existed = Boolean(store[key]);
clearCommandLane(resolveEmbeddedSessionLane(key));
if (sessionId && isEmbeddedPiRunActive(sessionId)) {
abortEmbeddedPiRun(sessionId);
const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000);
if (!ended) {
respond(
false,
undefined,
errorShape(
ErrorCodes.UNAVAILABLE,
`Session ${key} is still active; try again in a moment.`,
),
);
return;
}
}
if (existed) delete store[key];
await saveSessionStore(storePath, store);
const archived: string[] = [];
if (deleteTranscript && sessionId) {
for (const candidate of resolveSessionTranscriptCandidates(
sessionId,
storePath,
)) {
if (!fs.existsSync(candidate)) continue;
try {
archived.push(archiveFileOnDisk(candidate, "deleted"));
} catch {
// Best-effort.
}
}
}
respond(true, { ok: true, key, deleted: existed, archived }, undefined);
},
"sessions.compact": async ({ params, respond }) => {
if (!validateSessionsCompactParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid sessions.compact params: ${formatValidationErrors(validateSessionsCompactParams.errors)}`,
),
);
return;
}
const p = params as import("../protocol/index.js").SessionsCompactParams;
const key = String(p.key ?? "").trim();
if (!key) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "key required"),
);
return;
}
const maxLines =
typeof p.maxLines === "number" && Number.isFinite(p.maxLines)
? Math.max(1, Math.floor(p.maxLines))
: 400;
const { storePath, store, entry } = loadSessionEntry(key);
const sessionId = entry?.sessionId;
if (!sessionId) {
respond(true, { ok: true, key, compacted: false, reason: "no sessionId" }, undefined);
return;
}
const filePath = resolveSessionTranscriptCandidates(sessionId, storePath).find(
(candidate) => fs.existsSync(candidate),
);
if (!filePath) {
respond(true, { ok: true, key, compacted: false, reason: "no transcript" }, undefined);
return;
}
const raw = fs.readFileSync(filePath, "utf-8");
const lines = raw.split(/\r?\n/).filter((l) => l.trim().length > 0);
if (lines.length <= maxLines) {
respond(true, { ok: true, key, compacted: false, kept: lines.length }, undefined);
return;
}
const archived = archiveFileOnDisk(filePath, "bak");
const keptLines = lines.slice(-maxLines);
fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8");
if (store[key]) {
delete store[key].inputTokens;
delete store[key].outputTokens;
delete store[key].totalTokens;
store[key].updatedAt = Date.now();
await saveSessionStore(storePath, store);
}
respond(
true,
{
ok: true,
key,
compacted: true,
archived,
kept: keptLines.length,
},
undefined,
);
},
};

View File

@@ -0,0 +1,120 @@
import { DEFAULT_AGENT_WORKSPACE_DIR } from "../../agents/workspace.js";
import { installSkill } from "../../agents/skills-install.js";
import { buildWorkspaceSkillStatus } from "../../agents/skills-status.js";
import type { ClawdisConfig } from "../../config/config.js";
import { loadConfig, writeConfigFile } from "../../config/config.js";
import { resolveUserPath } from "../../utils.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateSkillsInstallParams,
validateSkillsStatusParams,
validateSkillsUpdateParams,
} from "../protocol/index.js";
import type { GatewayRequestHandlers } from "./types.js";
export const skillsHandlers: GatewayRequestHandlers = {
"skills.status": ({ params, respond }) => {
if (!validateSkillsStatusParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid skills.status params: ${formatValidationErrors(validateSkillsStatusParams.errors)}`,
),
);
return;
}
const cfg = loadConfig();
const workspaceDirRaw = cfg.agent?.workspace ?? DEFAULT_AGENT_WORKSPACE_DIR;
const workspaceDir = resolveUserPath(workspaceDirRaw);
const report = buildWorkspaceSkillStatus(workspaceDir, {
config: cfg,
});
respond(true, report, undefined);
},
"skills.install": async ({ params, respond }) => {
if (!validateSkillsInstallParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid skills.install params: ${formatValidationErrors(validateSkillsInstallParams.errors)}`,
),
);
return;
}
const p = params as {
name: string;
installId: string;
timeoutMs?: number;
};
const cfg = loadConfig();
const workspaceDirRaw = cfg.agent?.workspace ?? DEFAULT_AGENT_WORKSPACE_DIR;
const result = await installSkill({
workspaceDir: workspaceDirRaw,
skillName: p.name,
installId: p.installId,
timeoutMs: p.timeoutMs,
config: cfg,
});
respond(
result.ok,
result,
result.ok ? undefined : errorShape(ErrorCodes.UNAVAILABLE, result.message),
);
},
"skills.update": async ({ params, respond }) => {
if (!validateSkillsUpdateParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid skills.update params: ${formatValidationErrors(validateSkillsUpdateParams.errors)}`,
),
);
return;
}
const p = params as {
skillKey: string;
enabled?: boolean;
apiKey?: string;
env?: Record<string, string>;
};
const cfg = loadConfig();
const skills = cfg.skills ? { ...cfg.skills } : {};
const entries = skills.entries ? { ...skills.entries } : {};
const current = entries[p.skillKey] ? { ...entries[p.skillKey] } : {};
if (typeof p.enabled === "boolean") {
current.enabled = p.enabled;
}
if (typeof p.apiKey === "string") {
const trimmed = p.apiKey.trim();
if (trimmed) current.apiKey = trimmed;
else delete current.apiKey;
}
if (p.env && typeof p.env === "object") {
const nextEnv = current.env ? { ...current.env } : {};
for (const [key, value] of Object.entries(p.env)) {
const trimmedKey = key.trim();
if (!trimmedKey) continue;
const trimmedVal = value.trim();
if (!trimmedVal) delete nextEnv[trimmedKey];
else nextEnv[trimmedKey] = trimmedVal;
}
current.env = nextEnv;
}
entries[p.skillKey] = current;
skills.entries = entries;
const nextConfig: ClawdisConfig = {
...cfg,
skills,
};
await writeConfigFile(nextConfig);
respond(true, { ok: true, skillKey: p.skillKey, config: current }, undefined);
},
};

View File

@@ -0,0 +1,149 @@
import { getLastHeartbeatEvent } from "../../infra/heartbeat-events.js";
import { setHeartbeatsEnabled } from "../../infra/heartbeat-runner.js";
import {
enqueueSystemEvent,
isSystemEventContextChanged,
} from "../../infra/system-events.js";
import {
listSystemPresence,
updateSystemPresence,
} from "../../infra/system-presence.js";
import { ErrorCodes, errorShape } from "../protocol/index.js";
import type { GatewayRequestHandlers } from "./types.js";
export const systemHandlers: GatewayRequestHandlers = {
"last-heartbeat": ({ respond }) => {
respond(true, getLastHeartbeatEvent(), undefined);
},
"set-heartbeats": ({ params, respond }) => {
const enabled = params.enabled;
if (typeof enabled !== "boolean") {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"invalid set-heartbeats params: enabled (boolean) required",
),
);
return;
}
setHeartbeatsEnabled(enabled);
respond(true, { ok: true, enabled }, undefined);
},
"system-presence": ({ respond }) => {
const presence = listSystemPresence();
respond(true, presence, undefined);
},
"system-event": ({ params, respond, context }) => {
const text = typeof params.text === "string" ? params.text.trim() : "";
if (!text) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "text required"),
);
return;
}
const instanceId =
typeof params.instanceId === "string" ? params.instanceId : undefined;
const host = typeof params.host === "string" ? params.host : undefined;
const ip = typeof params.ip === "string" ? params.ip : undefined;
const mode = typeof params.mode === "string" ? params.mode : undefined;
const version =
typeof params.version === "string" ? params.version : undefined;
const platform =
typeof params.platform === "string" ? params.platform : undefined;
const deviceFamily =
typeof params.deviceFamily === "string" ? params.deviceFamily : undefined;
const modelIdentifier =
typeof params.modelIdentifier === "string"
? params.modelIdentifier
: undefined;
const lastInputSeconds =
typeof params.lastInputSeconds === "number" &&
Number.isFinite(params.lastInputSeconds)
? params.lastInputSeconds
: undefined;
const reason = typeof params.reason === "string" ? params.reason : undefined;
const tags =
Array.isArray(params.tags) &&
params.tags.every((t) => typeof t === "string")
? (params.tags as string[])
: undefined;
const presenceUpdate = updateSystemPresence({
text,
instanceId,
host,
ip,
mode,
version,
platform,
deviceFamily,
modelIdentifier,
lastInputSeconds,
reason,
tags,
});
const isNodePresenceLine = text.startsWith("Node:");
if (isNodePresenceLine) {
const next = presenceUpdate.next;
const changed = new Set(presenceUpdate.changedKeys);
const reasonValue = next.reason ?? reason;
const normalizedReason = (reasonValue ?? "").toLowerCase();
const ignoreReason =
normalizedReason.startsWith("periodic") ||
normalizedReason === "heartbeat";
const hostChanged = changed.has("host");
const ipChanged = changed.has("ip");
const versionChanged = changed.has("version");
const modeChanged = changed.has("mode");
const reasonChanged = changed.has("reason") && !ignoreReason;
const hasChanges =
hostChanged ||
ipChanged ||
versionChanged ||
modeChanged ||
reasonChanged;
if (hasChanges) {
const contextChanged = isSystemEventContextChanged(presenceUpdate.key);
const parts: string[] = [];
if (contextChanged || hostChanged || ipChanged) {
const hostLabel = next.host?.trim() || "Unknown";
const ipLabel = next.ip?.trim();
parts.push(`Node: ${hostLabel}${ipLabel ? ` (${ipLabel})` : ""}`);
}
if (versionChanged) {
parts.push(`app ${next.version?.trim() || "unknown"}`);
}
if (modeChanged) {
parts.push(`mode ${next.mode?.trim() || "unknown"}`);
}
if (reasonChanged) {
parts.push(`reason ${reasonValue?.trim() || "event"}`);
}
const deltaText = parts.join(" · ");
if (deltaText) {
enqueueSystemEvent(deltaText, {
contextKey: presenceUpdate.key,
});
}
}
} else {
enqueueSystemEvent(text);
}
const nextPresenceVersion = context.incrementPresenceVersion();
context.broadcast(
"presence",
{ presence: listSystemPresence() },
{
dropIfSlow: true,
stateVersion: {
presence: nextPresenceVersion,
health: context.getHealthVersion(),
},
},
);
respond(true, { ok: true }, undefined);
},
};

View File

@@ -0,0 +1,41 @@
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateTalkModeParams,
} from "../protocol/index.js";
import type { GatewayRequestHandlers } from "./types.js";
export const talkHandlers: GatewayRequestHandlers = {
"talk.mode": ({ params, respond, context, client, isWebchatConnect }) => {
if (client && isWebchatConnect(client.connect) && !context.hasConnectedMobileNode()) {
respond(
false,
undefined,
errorShape(
ErrorCodes.UNAVAILABLE,
"talk disabled: no connected iOS/Android nodes",
),
);
return;
}
if (!validateTalkModeParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid talk.mode params: ${formatValidationErrors(validateTalkModeParams.errors)}`,
),
);
return;
}
const payload = {
enabled: (params as { enabled: boolean }).enabled,
phase: (params as { phase?: string }).phase ?? null,
ts: Date.now(),
};
context.broadcast("talk.mode", payload, { dropIfSlow: true });
respond(true, payload, undefined);
},
};

View File

@@ -0,0 +1,101 @@
import type { ModelCatalogEntry } from "../../agents/model-catalog.js";
import type { createDefaultDeps } from "../../cli/deps.js";
import type { HealthSummary } from "../../commands/health.js";
import type { CronService } from "../../cron/service.js";
import type { startNodeBridgeServer } from "../../infra/bridge/server.js";
import type { WizardSession } from "../../wizard/session.js";
import type { ConnectParams, ErrorShape, RequestFrame } from "../protocol/index.js";
import type { DedupeEntry } from "../server-shared.js";
import type { ProviderRuntimeSnapshot } from "../server-providers.js";
export type GatewayClient = {
connect: ConnectParams;
};
export type RespondFn = (
ok: boolean,
payload?: unknown,
error?: ErrorShape,
meta?: Record<string, unknown>,
) => void;
export type GatewayRequestContext = {
deps: ReturnType<typeof createDefaultDeps>;
cron: CronService;
cronStorePath: string;
loadGatewayModelCatalog: () => Promise<ModelCatalogEntry[]>;
getHealthCache: () => HealthSummary | null;
refreshHealthSnapshot: (opts?: { probe?: boolean }) => Promise<HealthSummary>;
logHealth: { error: (message: string) => void };
incrementPresenceVersion: () => number;
getHealthVersion: () => number;
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
bridge: Awaited<ReturnType<typeof startNodeBridgeServer>> | null;
bridgeSendToSession: (
sessionKey: string,
event: string,
payload: unknown,
) => void;
hasConnectedMobileNode: () => boolean;
agentRunSeq: Map<string, number>;
chatAbortControllers: Map<
string,
{ controller: AbortController; sessionId: string; sessionKey: string }
>;
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
addChatRun: (
sessionId: string,
entry: { sessionKey: string; clientRunId: string },
) => void;
removeChatRun: (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => { sessionKey: string; clientRunId: string } | undefined;
dedupe: Map<string, DedupeEntry>;
wizardSessions: Map<string, WizardSession>;
findRunningWizard: () => string | null;
purgeWizardSession: (id: string) => void;
getRuntimeSnapshot: () => ProviderRuntimeSnapshot;
startWhatsAppProvider: () => Promise<void>;
stopWhatsAppProvider: () => Promise<void>;
stopTelegramProvider: () => Promise<void>;
markWhatsAppLoggedOut: (cleared: boolean) => void;
wizardRunner: (
opts: import("../../commands/onboard-types.js").OnboardOptions,
runtime: import("../../runtime.js").RuntimeEnv,
prompter: import("../../wizard/prompts.js").WizardPrompter,
) => Promise<void>;
broadcastVoiceWakeChanged: (triggers: string[]) => void;
};
export type GatewayRequestOptions = {
req: RequestFrame;
client: GatewayClient | null;
isWebchatConnect: (params: ConnectParams | null | undefined) => boolean;
respond: RespondFn;
context: GatewayRequestContext;
};
export type GatewayRequestHandlerOptions = {
req: RequestFrame;
params: Record<string, unknown>;
client: GatewayClient | null;
isWebchatConnect: (params: ConnectParams | null | undefined) => boolean;
respond: RespondFn;
context: GatewayRequestContext;
};
export type GatewayRequestHandler = (
opts: GatewayRequestHandlerOptions,
) => Promise<void> | void;
export type GatewayRequestHandlers = Record<string, GatewayRequestHandler>;

View File

@@ -0,0 +1,45 @@
import { loadVoiceWakeConfig, setVoiceWakeTriggers } from "../../infra/voicewake.js";
import { ErrorCodes, errorShape } from "../protocol/index.js";
import { normalizeVoiceWakeTriggers } from "../server-utils.js";
import { formatForLog } from "../ws-log.js";
import type { GatewayRequestHandlers } from "./types.js";
export const voicewakeHandlers: GatewayRequestHandlers = {
"voicewake.get": async ({ respond }) => {
try {
const cfg = await loadVoiceWakeConfig();
respond(true, { triggers: cfg.triggers });
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"voicewake.set": async ({ params, respond, context }) => {
if (!Array.isArray(params.triggers)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"voicewake.set requires triggers: string[]",
),
);
return;
}
try {
const triggers = normalizeVoiceWakeTriggers(params.triggers);
const cfg = await setVoiceWakeTriggers(triggers);
context.broadcastVoiceWakeChanged(cfg.triggers);
respond(true, { triggers: cfg.triggers });
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
};

View File

@@ -0,0 +1,91 @@
import { defaultRuntime } from "../../runtime.js";
import { startWebLoginWithQr, waitForWebLogin } from "../../web/login-qr.js";
import { logoutWeb } from "../../web/session.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateWebLoginStartParams,
validateWebLoginWaitParams,
} from "../protocol/index.js";
import { formatForLog } from "../ws-log.js";
import type { GatewayRequestHandlers } from "./types.js";
export const webHandlers: GatewayRequestHandlers = {
"web.login.start": async ({ params, respond, context }) => {
if (!validateWebLoginStartParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid web.login.start params: ${formatValidationErrors(validateWebLoginStartParams.errors)}`,
),
);
return;
}
try {
await context.stopWhatsAppProvider();
const result = await startWebLoginWithQr({
force: Boolean((params as { force?: boolean }).force),
timeoutMs:
typeof (params as { timeoutMs?: unknown }).timeoutMs === "number"
? (params as { timeoutMs?: number }).timeoutMs
: undefined,
verbose: Boolean((params as { verbose?: boolean }).verbose),
});
respond(true, result, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"web.login.wait": async ({ params, respond, context }) => {
if (!validateWebLoginWaitParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid web.login.wait params: ${formatValidationErrors(validateWebLoginWaitParams.errors)}`,
),
);
return;
}
try {
const result = await waitForWebLogin({
timeoutMs:
typeof (params as { timeoutMs?: unknown }).timeoutMs === "number"
? (params as { timeoutMs?: number }).timeoutMs
: undefined,
});
if (result.connected) {
await context.startWhatsAppProvider();
}
respond(true, result, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
"web.logout": async ({ respond, context }) => {
try {
await context.stopWhatsAppProvider();
const cleared = await logoutWeb(defaultRuntime);
context.markWhatsAppLoggedOut(cleared);
respond(true, { cleared }, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
},
};

View File

@@ -0,0 +1,166 @@
import { randomUUID } from "node:crypto";
import { WizardSession } from "../../wizard/session.js";
import { defaultRuntime } from "../../runtime.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateWizardCancelParams,
validateWizardNextParams,
validateWizardStartParams,
validateWizardStatusParams,
} from "../protocol/index.js";
import { formatForLog } from "../ws-log.js";
import type { GatewayRequestHandlers } from "./types.js";
export const wizardHandlers: GatewayRequestHandlers = {
"wizard.start": async ({ params, respond, context }) => {
if (!validateWizardStartParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid wizard.start params: ${formatValidationErrors(validateWizardStartParams.errors)}`,
),
);
return;
}
const running = context.findRunningWizard();
if (running) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, "wizard already running"),
);
return;
}
const sessionId = randomUUID();
const opts = {
mode: params.mode as "local" | "remote" | undefined,
workspace: typeof params.workspace === "string" ? params.workspace : undefined,
};
const session = new WizardSession((prompter) =>
context.wizardRunner(opts, defaultRuntime, prompter),
);
context.wizardSessions.set(sessionId, session);
const result = await session.next();
if (result.done) {
context.purgeWizardSession(sessionId);
}
respond(true, { sessionId, ...result }, undefined);
},
"wizard.next": async ({ params, respond, context }) => {
if (!validateWizardNextParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid wizard.next params: ${formatValidationErrors(validateWizardNextParams.errors)}`,
),
);
return;
}
const sessionId = params.sessionId as string;
const session = context.wizardSessions.get(sessionId);
if (!session) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "wizard not found"),
);
return;
}
const answer = params.answer as
| { stepId?: string; value?: unknown }
| undefined;
if (answer) {
if (session.getStatus() !== "running") {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "wizard not running"),
);
return;
}
try {
await session.answer(String(answer.stepId ?? ""), answer.value);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, formatForLog(err)),
);
return;
}
}
const result = await session.next();
if (result.done) {
context.purgeWizardSession(sessionId);
}
respond(true, result, undefined);
},
"wizard.cancel": ({ params, respond, context }) => {
if (!validateWizardCancelParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid wizard.cancel params: ${formatValidationErrors(validateWizardCancelParams.errors)}`,
),
);
return;
}
const sessionId = params.sessionId as string;
const session = context.wizardSessions.get(sessionId);
if (!session) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "wizard not found"),
);
return;
}
session.cancel();
const status = {
status: session.getStatus(),
error: session.getError(),
};
context.wizardSessions.delete(sessionId);
respond(true, status, undefined);
},
"wizard.status": ({ params, respond, context }) => {
if (!validateWizardStatusParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid wizard.status params: ${formatValidationErrors(validateWizardStatusParams.errors)}`,
),
);
return;
}
const sessionId = params.sessionId as string;
const session = context.wizardSessions.get(sessionId);
if (!session) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "wizard not found"),
);
return;
}
const status = {
status: session.getStatus(),
error: session.getError(),
};
if (status.status !== "running") {
context.wizardSessions.delete(sessionId);
}
respond(true, status, undefined);
},
};