fix: reduce log noise for node disconnect/late invoke errors (#1607)
* fix: reduce log noise for node disconnect/late invoke errors - Handle both 'node not connected' and 'node disconnected' errors at info level - Return success with late:true for unknown invoke IDs instead of error - Add 30-second throttle to skills change listener to prevent rapid-fire probes - Add tests for isNodeUnavailableError and late invoke handling * fix: clean up skills refresh timer and listener on shutdown Store the return value from registerSkillsChangeListener() and call it on gateway shutdown. Also clear any pending refresh timer. This follows the same pattern used for agentUnsub and heartbeatUnsub. * refactor: simplify KISS/YAGNI - inline checks, remove unit tests for internal utilities * fix: reduce gateway log noise (#1607) (thanks @petter-b) * test: align agent id casing expectations (#1607) --------- Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
@@ -12,7 +12,7 @@ Docs: https://docs.clawd.bot
|
||||
### Fixes
|
||||
- Web UI: hide internal `message_id` hints in chat bubbles.
|
||||
- Heartbeat: normalize target identifiers for consistent routing.
|
||||
- TUI: unify reasoning tag stripping so `<final>` wrappers stay hidden. (#1613) Thanks @kyleok.
|
||||
- Gateway: reduce log noise for late invokes + remote node probes; debounce skills refresh. (#1607) Thanks @petter-b.
|
||||
|
||||
## 2026.1.23-1
|
||||
|
||||
|
||||
@@ -468,7 +468,10 @@ export const nodeHandlers: GatewayRequestHandlers = {
|
||||
error: p.error ?? null,
|
||||
});
|
||||
if (!ok) {
|
||||
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unknown invoke id"));
|
||||
// Late-arriving results (after invoke timeout) are expected and harmless.
|
||||
// Return success instead of error to reduce log noise; client can discard.
|
||||
context.logGateway.debug(`late invoke result ignored: id=${p.id} node=${p.nodeId}`);
|
||||
respond(true, { ok: true, ignored: true }, undefined);
|
||||
return;
|
||||
}
|
||||
respond(true, { ok: true }, undefined);
|
||||
|
||||
@@ -344,9 +344,19 @@ export async function startGatewayServer(
|
||||
|
||||
setSkillsRemoteRegistry(nodeRegistry);
|
||||
void primeRemoteSkillsCache();
|
||||
registerSkillsChangeListener(() => {
|
||||
const latest = loadConfig();
|
||||
void refreshRemoteBinsForConnectedNodes(latest);
|
||||
// Debounce skills-triggered node probes to avoid feedback loops and rapid-fire invokes.
|
||||
// Skills changes can happen in bursts (e.g., file watcher events), and each probe
|
||||
// takes time to complete. A 30-second delay ensures we batch changes together.
|
||||
let skillsRefreshTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
const skillsRefreshDelayMs = 30_000;
|
||||
const skillsChangeUnsub = registerSkillsChangeListener((event) => {
|
||||
if (event.reason === "remote-node") return;
|
||||
if (skillsRefreshTimer) clearTimeout(skillsRefreshTimer);
|
||||
skillsRefreshTimer = setTimeout(() => {
|
||||
skillsRefreshTimer = null;
|
||||
const latest = loadConfig();
|
||||
void refreshRemoteBinsForConnectedNodes(latest);
|
||||
}, skillsRefreshDelayMs);
|
||||
});
|
||||
|
||||
const { tickInterval, healthInterval, dedupeCleanup } = startGatewayMaintenanceTimers({
|
||||
@@ -544,6 +554,11 @@ export async function startGatewayServer(
|
||||
if (diagnosticsEnabled) {
|
||||
stopDiagnosticHeartbeat();
|
||||
}
|
||||
if (skillsRefreshTimer) {
|
||||
clearTimeout(skillsRefreshTimer);
|
||||
skillsRefreshTimer = null;
|
||||
}
|
||||
skillsChangeUnsub();
|
||||
await close(opts);
|
||||
},
|
||||
};
|
||||
|
||||
125
src/gateway/server.nodes.late-invoke.test.ts
Normal file
125
src/gateway/server.nodes.late-invoke.test.ts
Normal file
@@ -0,0 +1,125 @@
|
||||
import { afterAll, beforeAll, describe, expect, test, vi } from "vitest";
|
||||
import { WebSocket } from "ws";
|
||||
|
||||
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
|
||||
import { loadOrCreateDeviceIdentity } from "../infra/device-identity.js";
|
||||
|
||||
vi.mock("../infra/update-runner.js", () => ({
|
||||
runGatewayUpdate: vi.fn(async () => ({
|
||||
status: "ok",
|
||||
mode: "git",
|
||||
root: "/repo",
|
||||
steps: [],
|
||||
durationMs: 12,
|
||||
})),
|
||||
}));
|
||||
|
||||
import {
|
||||
connectOk,
|
||||
installGatewayTestHooks,
|
||||
rpcReq,
|
||||
startServerWithClient,
|
||||
} from "./test-helpers.js";
|
||||
|
||||
installGatewayTestHooks({ scope: "suite" });
|
||||
|
||||
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"];
|
||||
let ws: WebSocket;
|
||||
let port: number;
|
||||
|
||||
beforeAll(async () => {
|
||||
const started = await startServerWithClient();
|
||||
server = started.server;
|
||||
ws = started.ws;
|
||||
port = started.port;
|
||||
await connectOk(ws);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
|
||||
describe("late-arriving invoke results", () => {
|
||||
test("returns success for unknown invoke id (late arrival after timeout)", async () => {
|
||||
// Create a node client WebSocket
|
||||
const nodeWs = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => nodeWs.once("open", resolve));
|
||||
|
||||
try {
|
||||
// Connect as a node with device identity
|
||||
const identity = loadOrCreateDeviceIdentity();
|
||||
const nodeId = identity.deviceId;
|
||||
|
||||
await connectOk(nodeWs, {
|
||||
role: "node",
|
||||
client: {
|
||||
id: GATEWAY_CLIENT_NAMES.NODE_HOST,
|
||||
version: "1.0.0",
|
||||
platform: "ios",
|
||||
mode: GATEWAY_CLIENT_MODES.NODE,
|
||||
},
|
||||
commands: ["canvas.snapshot"],
|
||||
});
|
||||
|
||||
// Send an invoke result with an unknown ID (simulating late arrival after timeout)
|
||||
const result = await rpcReq<{ ok?: boolean; ignored?: boolean }>(
|
||||
nodeWs,
|
||||
"node.invoke.result",
|
||||
{
|
||||
id: "unknown-invoke-id-12345",
|
||||
nodeId,
|
||||
ok: true,
|
||||
payloadJSON: JSON.stringify({ result: "late" }),
|
||||
},
|
||||
);
|
||||
|
||||
// Late-arriving results return success instead of error to reduce log noise
|
||||
expect(result.ok).toBe(true);
|
||||
expect(result.payload?.ok).toBe(true);
|
||||
expect(result.payload?.ignored).toBe(true);
|
||||
} finally {
|
||||
nodeWs.close();
|
||||
}
|
||||
});
|
||||
|
||||
test("returns success for unknown invoke id with error payload", async () => {
|
||||
// Verifies late results are accepted regardless of their ok/error status
|
||||
const nodeWs = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => nodeWs.once("open", resolve));
|
||||
|
||||
try {
|
||||
await connectOk(nodeWs, {
|
||||
role: "node",
|
||||
client: {
|
||||
id: GATEWAY_CLIENT_NAMES.NODE_HOST,
|
||||
version: "1.0.0",
|
||||
platform: "darwin",
|
||||
mode: GATEWAY_CLIENT_MODES.NODE,
|
||||
},
|
||||
commands: [],
|
||||
});
|
||||
|
||||
const identity = loadOrCreateDeviceIdentity();
|
||||
const nodeId = identity.deviceId;
|
||||
|
||||
// Late invoke result with error payload - should still return success
|
||||
const result = await rpcReq<{ ok?: boolean; ignored?: boolean }>(
|
||||
nodeWs,
|
||||
"node.invoke.result",
|
||||
{
|
||||
id: "another-unknown-invoke-id",
|
||||
nodeId,
|
||||
ok: false,
|
||||
error: { code: "FAILED", message: "test error" },
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
expect(result.payload?.ok).toBe(true);
|
||||
expect(result.payload?.ignored).toBe(true);
|
||||
} finally {
|
||||
nodeWs.close();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -55,10 +55,10 @@ function extractErrorMessage(err: unknown): string | undefined {
|
||||
function logRemoteBinProbeFailure(nodeId: string, err: unknown) {
|
||||
const message = extractErrorMessage(err);
|
||||
const label = describeNode(nodeId);
|
||||
if (message?.includes("node not connected")) {
|
||||
log.info(
|
||||
`remote bin probe skipped: node not connected (${label}); check nodes list/status for ${label}`,
|
||||
);
|
||||
// Node unavailable errors (not connected or disconnected mid-operation) are expected
|
||||
// when nodes have transient connections - log at info level instead of warn
|
||||
if (message?.includes("node not connected") || message?.includes("node disconnected")) {
|
||||
log.info(`remote bin probe skipped: node unavailable (${label})`);
|
||||
return;
|
||||
}
|
||||
if (message?.includes("invoke timed out") || message?.includes("timeout")) {
|
||||
@@ -213,6 +213,15 @@ function parseBinProbePayload(payloadJSON: string | null | undefined, payload?:
|
||||
return [];
|
||||
}
|
||||
|
||||
function areBinSetsEqual(a: Set<string> | undefined, b: Set<string>): boolean {
|
||||
if (!a) return false;
|
||||
if (a.size !== b.size) return false;
|
||||
for (const bin of b) {
|
||||
if (!a.has(bin)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
export async function refreshRemoteNodeBins(params: {
|
||||
nodeId: string;
|
||||
platform?: string;
|
||||
@@ -261,7 +270,11 @@ export async function refreshRemoteNodeBins(params: {
|
||||
return;
|
||||
}
|
||||
const bins = parseBinProbePayload(res.payloadJSON, res.payload);
|
||||
const existingBins = remoteNodes.get(params.nodeId)?.bins;
|
||||
const nextBins = new Set(bins);
|
||||
const hasChanged = !areBinSetsEqual(existingBins, nextBins);
|
||||
recordRemoteNodeBins(params.nodeId, bins);
|
||||
if (!hasChanged) return;
|
||||
await updatePairedNodeMetadata(params.nodeId, { bins });
|
||||
bumpSkillsSnapshotVersion({ reason: "remote-node" });
|
||||
} catch (err) {
|
||||
|
||||
Reference in New Issue
Block a user