fix: gate gateway restarts and discord abort reconnects
This commit is contained in:
@@ -1,5 +1,9 @@
|
||||
import type { startGatewayServer } from "../../gateway/server.js";
|
||||
import { createSubsystemLogger } from "../../logging.js";
|
||||
import {
|
||||
consumeGatewaySigusr1RestartAuthorization,
|
||||
isGatewaySigusr1RestartExternallyAllowed,
|
||||
} from "../../infra/restart.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import type { defaultRuntime } from "../../runtime.js";
|
||||
|
||||
const gatewayLog = createSubsystemLogger("gateway");
|
||||
@@ -67,6 +71,13 @@ export async function runGatewayLoop(params: {
|
||||
};
|
||||
const onSigusr1 = () => {
|
||||
gatewayLog.info("signal SIGUSR1 received");
|
||||
const authorized = consumeGatewaySigusr1RestartAuthorization();
|
||||
if (!authorized && !isGatewaySigusr1RestartExternallyAllowed()) {
|
||||
gatewayLog.warn(
|
||||
"SIGUSR1 restart ignored (not authorized; enable commands.restart or use gateway tool).",
|
||||
);
|
||||
return;
|
||||
}
|
||||
request("restart", "SIGUSR1");
|
||||
};
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import {
|
||||
import type { ClawdbotConfig, ReplyToMode } from "../../config/config.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import { danger, logVerbose, shouldLogVerbose, warn } from "../../globals.js";
|
||||
import { createSubsystemLogger } from "../../logging.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import type { RuntimeEnv } from "../../runtime.js";
|
||||
import { resolveDiscordAccount } from "../accounts.js";
|
||||
import { attachDiscordGatewayLogging } from "../gateway-logging.js";
|
||||
@@ -443,6 +443,17 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
emitter: gatewayEmitter,
|
||||
runtime,
|
||||
});
|
||||
const abortSignal = opts.abortSignal;
|
||||
const onAbort = () => {
|
||||
if (!gateway) return;
|
||||
gateway.options.reconnect = { maxAttempts: 0 };
|
||||
gateway.disconnect();
|
||||
};
|
||||
if (abortSignal?.aborted) {
|
||||
onAbort();
|
||||
} else {
|
||||
abortSignal?.addEventListener("abort", onAbort, { once: true });
|
||||
}
|
||||
// Timeout to detect zombie connections where HELLO is never received.
|
||||
const HELLO_TIMEOUT_MS = 30000;
|
||||
let helloTimeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
@@ -472,7 +483,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
disconnect: () => gateway.disconnect(),
|
||||
}
|
||||
: undefined,
|
||||
abortSignal: opts.abortSignal,
|
||||
abortSignal,
|
||||
onGatewayError: (err) => {
|
||||
runtime.error?.(danger(`discord gateway error: ${String(err)}`));
|
||||
},
|
||||
@@ -487,6 +498,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
stopGatewayLogging();
|
||||
if (helloTimeoutId) clearTimeout(helloTimeoutId);
|
||||
gatewayEmitter?.removeListener("debug", onGatewayDebug);
|
||||
abortSignal?.removeEventListener("abort", onAbort);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,10 @@ import type { loadConfig } from "../config/config.js";
|
||||
import { startGmailWatcher, stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
||||
import { startHeartbeatRunner } from "../infra/heartbeat-runner.js";
|
||||
import { resetDirectoryCache } from "../infra/outbound/target-resolver.js";
|
||||
import {
|
||||
authorizeGatewaySigusr1Restart,
|
||||
setGatewaySigusr1RestartPolicy,
|
||||
} from "../infra/restart.js";
|
||||
import { setCommandLaneConcurrency } from "../process/command-queue.js";
|
||||
import { isTruthyEnvValue } from "../infra/env.js";
|
||||
import type { ChannelKind, GatewayReloadPlan } from "./config-reload.js";
|
||||
@@ -38,6 +42,7 @@ export function createGatewayReloadHandlers(params: {
|
||||
plan: GatewayReloadPlan,
|
||||
nextConfig: ReturnType<typeof loadConfig>,
|
||||
) => {
|
||||
setGatewaySigusr1RestartPolicy({ allowExternal: nextConfig.commands?.restart === true });
|
||||
const state = params.getState();
|
||||
const nextState = { ...state };
|
||||
|
||||
@@ -139,8 +144,9 @@ export function createGatewayReloadHandlers(params: {
|
||||
|
||||
const requestGatewayRestart = (
|
||||
plan: GatewayReloadPlan,
|
||||
_nextConfig: ReturnType<typeof loadConfig>,
|
||||
nextConfig: ReturnType<typeof loadConfig>,
|
||||
) => {
|
||||
setGatewaySigusr1RestartPolicy({ allowExternal: nextConfig.commands?.restart === true });
|
||||
const reasons = plan.restartReasons.length
|
||||
? plan.restartReasons.join(", ")
|
||||
: plan.changedPaths.join(", ");
|
||||
@@ -149,6 +155,7 @@ export function createGatewayReloadHandlers(params: {
|
||||
params.logReload.warn("no SIGUSR1 listener found; restart skipped");
|
||||
return;
|
||||
}
|
||||
authorizeGatewaySigusr1Restart();
|
||||
process.emit("SIGUSR1");
|
||||
};
|
||||
|
||||
|
||||
@@ -23,8 +23,9 @@ import {
|
||||
setSkillsRemoteBridge,
|
||||
} from "../infra/skills-remote.js";
|
||||
import { scheduleGatewayUpdateCheck } from "../infra/update-startup.js";
|
||||
import { setGatewaySigusr1RestartPolicy } from "../infra/restart.js";
|
||||
import { autoMigrateLegacyState } from "../infra/state-migrations.js";
|
||||
import { createSubsystemLogger, runtimeForLogger } from "../logging.js";
|
||||
import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js";
|
||||
import type { PluginServicesHandle } from "../plugins/services.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import { runOnboardingWizard } from "../wizard/onboarding.js";
|
||||
@@ -172,6 +173,7 @@ export async function startGatewayServer(
|
||||
}
|
||||
|
||||
const cfgAtStart = loadConfig();
|
||||
setGatewaySigusr1RestartPolicy({ allowExternal: cfgAtStart.commands?.restart === true });
|
||||
initSubagentRegistry();
|
||||
await autoMigrateLegacyState({ cfg: cfgAtStart, log });
|
||||
const defaultAgentId = resolveDefaultAgentId(cfgAtStart);
|
||||
|
||||
41
src/infra/restart.test.ts
Normal file
41
src/infra/restart.test.ts
Normal file
@@ -0,0 +1,41 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import {
|
||||
__testing,
|
||||
consumeGatewaySigusr1RestartAuthorization,
|
||||
isGatewaySigusr1RestartExternallyAllowed,
|
||||
scheduleGatewaySigusr1Restart,
|
||||
setGatewaySigusr1RestartPolicy,
|
||||
} from "./restart.js";
|
||||
|
||||
describe("restart authorization", () => {
|
||||
beforeEach(() => {
|
||||
__testing.resetSigusr1State();
|
||||
vi.useFakeTimers();
|
||||
vi.spyOn(process, "kill").mockImplementation(() => true);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
vi.useRealTimers();
|
||||
vi.restoreAllMocks();
|
||||
__testing.resetSigusr1State();
|
||||
});
|
||||
|
||||
it("consumes a scheduled authorization once", async () => {
|
||||
expect(consumeGatewaySigusr1RestartAuthorization()).toBe(false);
|
||||
|
||||
scheduleGatewaySigusr1Restart({ delayMs: 0 });
|
||||
|
||||
expect(consumeGatewaySigusr1RestartAuthorization()).toBe(true);
|
||||
expect(consumeGatewaySigusr1RestartAuthorization()).toBe(false);
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
});
|
||||
|
||||
it("tracks external restart policy", () => {
|
||||
expect(isGatewaySigusr1RestartExternallyAllowed()).toBe(false);
|
||||
setGatewaySigusr1RestartPolicy({ allowExternal: true });
|
||||
expect(isGatewaySigusr1RestartExternallyAllowed()).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -12,6 +12,45 @@ export type RestartAttempt = {
|
||||
};
|
||||
|
||||
const SPAWN_TIMEOUT_MS = 2000;
|
||||
const SIGUSR1_AUTH_GRACE_MS = 5000;
|
||||
|
||||
let sigusr1AuthorizedCount = 0;
|
||||
let sigusr1AuthorizedUntil = 0;
|
||||
let sigusr1ExternalAllowed = false;
|
||||
|
||||
function resetSigusr1AuthorizationIfExpired(now = Date.now()) {
|
||||
if (sigusr1AuthorizedCount <= 0) return;
|
||||
if (now <= sigusr1AuthorizedUntil) return;
|
||||
sigusr1AuthorizedCount = 0;
|
||||
sigusr1AuthorizedUntil = 0;
|
||||
}
|
||||
|
||||
export function setGatewaySigusr1RestartPolicy(opts?: { allowExternal?: boolean }) {
|
||||
sigusr1ExternalAllowed = opts?.allowExternal === true;
|
||||
}
|
||||
|
||||
export function isGatewaySigusr1RestartExternallyAllowed() {
|
||||
return sigusr1ExternalAllowed;
|
||||
}
|
||||
|
||||
export function authorizeGatewaySigusr1Restart(delayMs = 0) {
|
||||
const delay = Math.max(0, Math.floor(delayMs));
|
||||
const expiresAt = Date.now() + delay + SIGUSR1_AUTH_GRACE_MS;
|
||||
sigusr1AuthorizedCount += 1;
|
||||
if (expiresAt > sigusr1AuthorizedUntil) {
|
||||
sigusr1AuthorizedUntil = expiresAt;
|
||||
}
|
||||
}
|
||||
|
||||
export function consumeGatewaySigusr1RestartAuthorization(): boolean {
|
||||
resetSigusr1AuthorizationIfExpired();
|
||||
if (sigusr1AuthorizedCount <= 0) return false;
|
||||
sigusr1AuthorizedCount -= 1;
|
||||
if (sigusr1AuthorizedCount <= 0) {
|
||||
sigusr1AuthorizedUntil = 0;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
function formatSpawnDetail(result: {
|
||||
error?: unknown;
|
||||
@@ -134,6 +173,7 @@ export function scheduleGatewaySigusr1Restart(opts?: {
|
||||
typeof opts?.reason === "string" && opts.reason.trim()
|
||||
? opts.reason.trim().slice(0, 200)
|
||||
: undefined;
|
||||
authorizeGatewaySigusr1Restart(delayMs);
|
||||
const pid = process.pid;
|
||||
const hasListener = process.listenerCount("SIGUSR1") > 0;
|
||||
setTimeout(() => {
|
||||
@@ -156,3 +196,11 @@ export function scheduleGatewaySigusr1Restart(opts?: {
|
||||
mode: hasListener ? "emit" : "signal",
|
||||
};
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
resetSigusr1State() {
|
||||
sigusr1AuthorizedCount = 0;
|
||||
sigusr1AuthorizedUntil = 0;
|
||||
sigusr1ExternalAllowed = false;
|
||||
},
|
||||
};
|
||||
|
||||
@@ -45,6 +45,10 @@ async function main() {
|
||||
{ startGatewayServer },
|
||||
{ setGatewayWsLogStyle },
|
||||
{ setVerbose },
|
||||
{
|
||||
consumeGatewaySigusr1RestartAuthorization,
|
||||
isGatewaySigusr1RestartExternallyAllowed,
|
||||
},
|
||||
{ defaultRuntime },
|
||||
{ enableConsoleCapture, setConsoleTimestampPrefix },
|
||||
] = await Promise.all([
|
||||
@@ -52,6 +56,7 @@ async function main() {
|
||||
import("../gateway/server.js"),
|
||||
import("../gateway/ws-logging.js"),
|
||||
import("../globals.js"),
|
||||
import("../infra/restart.js"),
|
||||
import("../runtime.js"),
|
||||
import("../logging.js"),
|
||||
]);
|
||||
@@ -156,6 +161,13 @@ async function main() {
|
||||
};
|
||||
const onSigusr1 = () => {
|
||||
defaultRuntime.log("gateway: signal SIGUSR1 received");
|
||||
const authorized = consumeGatewaySigusr1RestartAuthorization();
|
||||
if (!authorized && !isGatewaySigusr1RestartExternallyAllowed()) {
|
||||
defaultRuntime.log(
|
||||
"gateway: SIGUSR1 restart ignored (not authorized; enable commands.restart or use gateway tool).",
|
||||
);
|
||||
return;
|
||||
}
|
||||
request("restart", "SIGUSR1");
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user