feat: add gateway restart tool

This commit is contained in:
Peter Steinberger
2025-12-25 18:05:37 +00:00
parent aafcd569b1
commit 42eb7640f9
6 changed files with 266 additions and 135 deletions

View File

@@ -26,6 +26,7 @@ pnpm clawdis gateway --force
- Pass `--verbose` to mirror debug logging (handshakes, req/res, events) from the log file into stdio when troubleshooting. - Pass `--verbose` to mirror debug logging (handshakes, req/res, events) from the log file into stdio when troubleshooting.
- `--force` uses `lsof` to find listeners on the chosen port, sends SIGTERM, logs what it killed, then starts the gateway (fails fast if `lsof` is missing). - `--force` uses `lsof` to find listeners on the chosen port, sends SIGTERM, logs what it killed, then starts the gateway (fails fast if `lsof` is missing).
- If you run under a supervisor (launchd/systemd/mac app child-process mode), a stop/restart typically sends **SIGTERM**; older builds may surface this as `pnpm` `ELIFECYCLE` exit code **143** (SIGTERM), which is a normal shutdown, not a crash. - If you run under a supervisor (launchd/systemd/mac app child-process mode), a stop/restart typically sends **SIGTERM**; older builds may surface this as `pnpm` `ELIFECYCLE` exit code **143** (SIGTERM), which is a normal shutdown, not a crash.
- **SIGUSR1** triggers an in-process restart (no external supervisor required). This is what the `clawdis_gateway` agent tool uses.
- Optional shared secret: pass `--token <value>` or set `CLAWDIS_GATEWAY_TOKEN` to require clients to send `connect.params.auth.token`. - Optional shared secret: pass `--token <value>` or set `CLAWDIS_GATEWAY_TOKEN` to require clients to send `connect.params.auth.token`.
## Remote access ## Remote access

View File

@@ -0,0 +1,36 @@
import { describe, expect, it, vi } from "vitest";
import { createClawdisTools } from "./clawdis-tools.js";
describe("clawdis_gateway tool", () => {
it("schedules SIGUSR1 restart", async () => {
vi.useFakeTimers();
const kill = vi.spyOn(process, "kill").mockImplementation(() => true);
try {
const tool = createClawdisTools().find(
(candidate) => candidate.name === "clawdis_gateway",
);
expect(tool).toBeDefined();
if (!tool) throw new Error("missing clawdis_gateway tool");
const result = await tool.execute("call1", {
action: "restart",
delayMs: 0,
});
expect(result.details).toMatchObject({
ok: true,
pid: process.pid,
signal: "SIGUSR1",
delayMs: 0,
});
expect(kill).not.toHaveBeenCalled();
await vi.runAllTimersAsync();
expect(kill).toHaveBeenCalledWith(process.pid, "SIGUSR1");
} finally {
kill.mockRestore();
vi.useRealTimers();
}
});
});

View File

@@ -1370,11 +1370,62 @@ function createCronTool(): AnyAgentTool {
}; };
} }
const GatewayToolSchema = Type.Union([
Type.Object({
action: Type.Literal("restart"),
delayMs: Type.Optional(Type.Number()),
reason: Type.Optional(Type.String()),
}),
]);
function createGatewayTool(): AnyAgentTool {
return {
label: "Clawdis Gateway",
name: "clawdis_gateway",
description:
"Restart the running gateway process in-place (SIGUSR1) without needing an external supervisor. Use delayMs to avoid interrupting an in-flight reply.",
parameters: GatewayToolSchema,
execute: async (_toolCallId, args) => {
const params = args as Record<string, unknown>;
const action = readStringParam(params, "action", { required: true });
if (action !== "restart") throw new Error(`Unknown action: ${action}`);
const delayMsRaw =
typeof params.delayMs === "number" && Number.isFinite(params.delayMs)
? Math.floor(params.delayMs)
: 2000;
const delayMs = Math.min(Math.max(delayMsRaw, 0), 60_000);
const reason =
typeof params.reason === "string" && params.reason.trim()
? params.reason.trim().slice(0, 200)
: undefined;
const pid = process.pid;
setTimeout(() => {
try {
process.kill(pid, "SIGUSR1");
} catch {
/* ignore */
}
}, delayMs);
return jsonResult({
ok: true,
pid,
signal: "SIGUSR1",
delayMs,
reason: reason ?? null,
});
},
};
}
export function createClawdisTools(): AnyAgentTool[] { export function createClawdisTools(): AnyAgentTool[] {
return [ return [
createBrowserTool(), createBrowserTool(),
createCanvasTool(), createCanvasTool(),
createNodesTool(), createNodesTool(),
createCronTool(), createCronTool(),
createGatewayTool(),
]; ];
} }

View File

@@ -25,6 +25,84 @@ type GatewayRpcOpts = {
const gatewayLog = createSubsystemLogger("gateway"); const gatewayLog = createSubsystemLogger("gateway");
type GatewayRunSignalAction = "stop" | "restart";
async function runGatewayLoop(params: {
start: () => Promise<Awaited<ReturnType<typeof startGatewayServer>>>;
runtime: typeof defaultRuntime;
}) {
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
let shuttingDown = false;
let restartResolver: (() => void) | null = null;
const cleanupSignals = () => {
process.removeListener("SIGTERM", onSigterm);
process.removeListener("SIGINT", onSigint);
process.removeListener("SIGUSR1", onSigusr1);
};
const request = (action: GatewayRunSignalAction, signal: string) => {
if (shuttingDown) {
gatewayLog.info(`received ${signal} during shutdown; ignoring`);
return;
}
shuttingDown = true;
const isRestart = action === "restart";
gatewayLog.info(
`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`,
);
const forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
cleanupSignals();
params.runtime.exit(0);
}, 5000);
void (async () => {
try {
await server?.close({
reason: isRestart ? "gateway restarting" : "gateway stopping",
restartExpectedMs: isRestart ? 1500 : null,
});
} catch (err) {
gatewayLog.error(`shutdown error: ${String(err)}`);
} finally {
clearTimeout(forceExitTimer);
server = null;
if (isRestart) {
shuttingDown = false;
restartResolver?.();
} else {
cleanupSignals();
params.runtime.exit(0);
}
}
})();
};
const onSigterm = () => request("stop", "SIGTERM");
const onSigint = () => request("stop", "SIGINT");
const onSigusr1 = () => request("restart", "SIGUSR1");
process.on("SIGTERM", onSigterm);
process.on("SIGINT", onSigint);
process.on("SIGUSR1", onSigusr1);
try {
// Keep process alive; SIGUSR1 triggers an in-process restart (no supervisor required).
// SIGTERM/SIGINT still exit after a graceful shutdown.
// eslint-disable-next-line no-constant-condition
while (true) {
server = await params.start();
await new Promise<void>((resolve) => {
restartResolver = resolve;
});
}
} finally {
cleanupSignals();
}
}
const gatewayCallOpts = (cmd: Command) => const gatewayCallOpts = (cmd: Command) =>
cmd cmd
.option("--url <url>", "Gateway WebSocket URL", "ws://127.0.0.1:18789") .option("--url <url>", "Gateway WebSocket URL", "ws://127.0.0.1:18789")
@@ -155,61 +233,27 @@ export function registerGatewayCli(program: Command) {
return; return;
} }
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
let shuttingDown = false;
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
const onSigterm = () => shutdown("SIGTERM");
const onSigint = () => shutdown("SIGINT");
const shutdown = (signal: string) => {
process.removeListener("SIGTERM", onSigterm);
process.removeListener("SIGINT", onSigint);
if (shuttingDown) {
gatewayLog.info(`received ${signal} during shutdown; exiting now`);
defaultRuntime.exit(0);
}
shuttingDown = true;
gatewayLog.info(`received ${signal}; shutting down`);
forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
defaultRuntime.exit(0);
}, 5000);
void (async () => {
try {
await server?.close();
} catch (err) {
gatewayLog.error(`shutdown error: ${String(err)}`);
} finally {
if (forceExitTimer) clearTimeout(forceExitTimer);
defaultRuntime.exit(0);
}
})();
};
process.once("SIGTERM", onSigterm);
process.once("SIGINT", onSigint);
try { try {
server = await startGatewayServer(port, { await runGatewayLoop({
bind, runtime: defaultRuntime,
auth: start: async () =>
authMode || opts.password || authModeRaw await startGatewayServer(port, {
? { bind,
mode: authMode ?? undefined, auth:
password: opts.password ? String(opts.password) : undefined, authMode || opts.password || authModeRaw
} ? {
: undefined, mode: authMode ?? undefined,
tailscale: password: opts.password ? String(opts.password) : undefined,
tailscaleMode || opts.tailscaleResetOnExit }
? { : undefined,
mode: tailscaleMode ?? undefined, tailscale:
resetOnExit: Boolean(opts.tailscaleResetOnExit), tailscaleMode || opts.tailscaleResetOnExit
} ? {
: undefined, mode: tailscaleMode ?? undefined,
resetOnExit: Boolean(opts.tailscaleResetOnExit),
}
: undefined,
}),
}); });
} catch (err) { } catch (err) {
if (err instanceof GatewayLockError) { if (err instanceof GatewayLockError) {
@@ -220,8 +264,6 @@ export function registerGatewayCli(program: Command) {
defaultRuntime.error(`Gateway failed to start: ${String(err)}`); defaultRuntime.error(`Gateway failed to start: ${String(err)}`);
defaultRuntime.exit(1); defaultRuntime.exit(1);
} }
await new Promise<never>(() => {});
}); });
const gateway = program const gateway = program
@@ -385,63 +427,27 @@ export function registerGatewayCli(program: Command) {
return; return;
} }
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
let shuttingDown = false;
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
const onSigterm = () => shutdown("SIGTERM");
const onSigint = () => shutdown("SIGINT");
const shutdown = (signal: string) => {
// Ensure we don't leak listeners across restarts/tests.
process.removeListener("SIGTERM", onSigterm);
process.removeListener("SIGINT", onSigint);
if (shuttingDown) {
gatewayLog.info(`received ${signal} during shutdown; exiting now`);
defaultRuntime.exit(0);
}
shuttingDown = true;
gatewayLog.info(`received ${signal}; shutting down`);
// Avoid hanging forever if a provider task ignores abort.
forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
defaultRuntime.exit(0);
}, 5000);
void (async () => {
try {
await server?.close();
} catch (err) {
gatewayLog.error(`shutdown error: ${String(err)}`);
} finally {
if (forceExitTimer) clearTimeout(forceExitTimer);
defaultRuntime.exit(0);
}
})();
};
process.once("SIGTERM", onSigterm);
process.once("SIGINT", onSigint);
try { try {
server = await startGatewayServer(port, { await runGatewayLoop({
bind, runtime: defaultRuntime,
auth: start: async () =>
authMode || opts.password || authModeRaw await startGatewayServer(port, {
? { bind,
mode: authMode ?? undefined, auth:
password: opts.password ? String(opts.password) : undefined, authMode || opts.password || authModeRaw
} ? {
: undefined, mode: authMode ?? undefined,
tailscale: password: opts.password ? String(opts.password) : undefined,
tailscaleMode || opts.tailscaleResetOnExit }
? { : undefined,
mode: tailscaleMode ?? undefined, tailscale:
resetOnExit: Boolean(opts.tailscaleResetOnExit), tailscaleMode || opts.tailscaleResetOnExit
} ? {
: undefined, mode: tailscaleMode ?? undefined,
resetOnExit: Boolean(opts.tailscaleResetOnExit),
}
: undefined,
}),
}); });
} catch (err) { } catch (err) {
if (err instanceof GatewayLockError) { if (err instanceof GatewayLockError) {
@@ -452,8 +458,6 @@ export function registerGatewayCli(program: Command) {
defaultRuntime.error(`Gateway failed to start: ${String(err)}`); defaultRuntime.error(`Gateway failed to start: ${String(err)}`);
defaultRuntime.exit(1); defaultRuntime.exit(1);
} }
// Keep process alive
await new Promise<never>(() => {});
}); });
gatewayCallOpts( gatewayCallOpts(

View File

@@ -515,7 +515,10 @@ const EVENTS = [
]; ];
export type GatewayServer = { export type GatewayServer = {
close: () => Promise<void>; close: (opts?: {
reason?: string;
restartExpectedMs?: number | null;
}) => Promise<void>;
}; };
export type GatewayServerOptions = { export type GatewayServerOptions = {
@@ -5911,7 +5914,15 @@ export async function startGatewayServer(
} }
return { return {
close: async () => { close: async (opts) => {
const reasonRaw =
typeof opts?.reason === "string" ? opts.reason.trim() : "";
const reason = reasonRaw || "gateway stopping";
const restartExpectedMs =
typeof opts?.restartExpectedMs === "number" &&
Number.isFinite(opts.restartExpectedMs)
? Math.max(0, Math.floor(opts.restartExpectedMs))
: null;
if (bonjourStop) { if (bonjourStop) {
try { try {
await bonjourStop(); await bonjourStop();
@@ -5947,8 +5958,8 @@ export async function startGatewayServer(
await stopTelegramProvider(); await stopTelegramProvider();
cron.stop(); cron.stop();
broadcast("shutdown", { broadcast("shutdown", {
reason: "gateway stopping", reason,
restartExpectedMs: null, restartExpectedMs,
}); });
clearInterval(tickInterval); clearInterval(tickInterval);
clearInterval(healthInterval); clearInterval(healthInterval);

View File

@@ -95,54 +95,82 @@ async function main() {
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null; let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
let shuttingDown = false; let shuttingDown = false;
let forceExitTimer: ReturnType<typeof setTimeout> | null = null; let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
let restartResolver: (() => void) | null = null;
const shutdown = (signal: string) => { const cleanupSignals = () => {
process.removeListener("SIGTERM", onSigterm); process.removeListener("SIGTERM", onSigterm);
process.removeListener("SIGINT", onSigint); process.removeListener("SIGINT", onSigint);
process.removeListener("SIGUSR1", onSigusr1);
};
const request = (action: "stop" | "restart", signal: string) => {
if (shuttingDown) { if (shuttingDown) {
defaultRuntime.log( defaultRuntime.log(
`gateway: received ${signal} during shutdown; exiting now`, `gateway: received ${signal} during shutdown; ignoring`,
); );
process.exit(0); return;
} }
shuttingDown = true; shuttingDown = true;
defaultRuntime.log(`gateway: received ${signal}; shutting down`); const isRestart = action === "restart";
defaultRuntime.log(
`gateway: received ${signal}; ${isRestart ? "restarting" : "shutting down"}`,
);
forceExitTimer = setTimeout(() => { forceExitTimer = setTimeout(() => {
defaultRuntime.error( defaultRuntime.error(
"gateway: shutdown timed out; exiting without full cleanup", "gateway: shutdown timed out; exiting without full cleanup",
); );
cleanupSignals();
process.exit(0); process.exit(0);
}, 5000); }, 5000);
void (async () => { void (async () => {
try { try {
await server?.close(); await server?.close({
reason: isRestart ? "gateway restarting" : "gateway stopping",
restartExpectedMs: isRestart ? 1500 : null,
});
} catch (err) { } catch (err) {
defaultRuntime.error(`gateway: shutdown error: ${String(err)}`); defaultRuntime.error(`gateway: shutdown error: ${String(err)}`);
} finally { } finally {
if (forceExitTimer) clearTimeout(forceExitTimer); if (forceExitTimer) clearTimeout(forceExitTimer);
process.exit(0); server = null;
if (isRestart) {
shuttingDown = false;
restartResolver?.();
} else {
cleanupSignals();
process.exit(0);
}
} }
})(); })();
}; };
const onSigterm = () => shutdown("SIGTERM"); const onSigterm = () => request("stop", "SIGTERM");
const onSigint = () => shutdown("SIGINT"); const onSigint = () => request("stop", "SIGINT");
const onSigusr1 = () => request("restart", "SIGUSR1");
process.once("SIGTERM", onSigterm); process.on("SIGTERM", onSigterm);
process.once("SIGINT", onSigint); process.on("SIGINT", onSigint);
process.on("SIGUSR1", onSigusr1);
try { try {
server = await startGatewayServer(port, { bind }); // eslint-disable-next-line no-constant-condition
} catch (err) { while (true) {
defaultRuntime.error(`Gateway failed to start: ${String(err)}`); try {
process.exit(1); server = await startGatewayServer(port, { bind });
} catch (err) {
cleanupSignals();
defaultRuntime.error(`Gateway failed to start: ${String(err)}`);
process.exit(1);
}
await new Promise<void>((resolve) => {
restartResolver = resolve;
});
}
} finally {
cleanupSignals();
} }
// Keep process alive
await new Promise<never>(() => {});
} }
void main(); void main();