fix: emit delta-only node system events
This commit is contained in:
@@ -74,7 +74,10 @@ import {
|
|||||||
verifyNodeToken,
|
verifyNodeToken,
|
||||||
} from "../infra/node-pairing.js";
|
} from "../infra/node-pairing.js";
|
||||||
import { ensureClawdisCliOnPath } from "../infra/path-env.js";
|
import { ensureClawdisCliOnPath } from "../infra/path-env.js";
|
||||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
import {
|
||||||
|
enqueueSystemEvent,
|
||||||
|
isSystemEventContextChanged,
|
||||||
|
} from "../infra/system-events.js";
|
||||||
import {
|
import {
|
||||||
listSystemPresence,
|
listSystemPresence,
|
||||||
updateSystemPresence,
|
updateSystemPresence,
|
||||||
@@ -4014,7 +4017,7 @@ export async function startGatewayServer(
|
|||||||
params.tags.every((t) => typeof t === "string")
|
params.tags.every((t) => typeof t === "string")
|
||||||
? (params.tags as string[])
|
? (params.tags as string[])
|
||||||
: undefined;
|
: undefined;
|
||||||
updateSystemPresence({
|
const presenceUpdate = updateSystemPresence({
|
||||||
text,
|
text,
|
||||||
instanceId,
|
instanceId,
|
||||||
host,
|
host,
|
||||||
@@ -4029,17 +4032,55 @@ export async function startGatewayServer(
|
|||||||
tags,
|
tags,
|
||||||
});
|
});
|
||||||
const isNodePresenceLine = text.startsWith("Node:");
|
const isNodePresenceLine = text.startsWith("Node:");
|
||||||
const normalizedReason = (reason ?? "").toLowerCase();
|
if (isNodePresenceLine) {
|
||||||
const looksPeriodic =
|
const next = presenceUpdate.next;
|
||||||
normalizedReason.startsWith("periodic") ||
|
const changed = new Set(presenceUpdate.changedKeys);
|
||||||
normalizedReason === "heartbeat";
|
const reasonValue = next.reason ?? reason;
|
||||||
if (!(isNodePresenceLine && looksPeriodic)) {
|
const normalizedReason = (reasonValue ?? "").toLowerCase();
|
||||||
const compactNodeText =
|
const ignoreReason =
|
||||||
isNodePresenceLine &&
|
normalizedReason.startsWith("periodic") ||
|
||||||
(host || ip || version || mode || reason)
|
normalizedReason === "heartbeat";
|
||||||
? `Node: ${host?.trim() || "Unknown"}${ip ? ` (${ip})` : ""} · app ${version?.trim() || "unknown"} · mode ${mode?.trim() || "unknown"} · reason ${reason?.trim() || "event"}`
|
const hostChanged = changed.has("host");
|
||||||
: text;
|
const ipChanged = changed.has("ip");
|
||||||
enqueueSystemEvent(compactNodeText);
|
const versionChanged = changed.has("version");
|
||||||
|
const modeChanged = changed.has("mode");
|
||||||
|
const reasonChanged = changed.has("reason") && !ignoreReason;
|
||||||
|
const hasChanges =
|
||||||
|
hostChanged ||
|
||||||
|
ipChanged ||
|
||||||
|
versionChanged ||
|
||||||
|
modeChanged ||
|
||||||
|
reasonChanged;
|
||||||
|
if (hasChanges) {
|
||||||
|
const contextChanged = isSystemEventContextChanged(
|
||||||
|
presenceUpdate.key,
|
||||||
|
);
|
||||||
|
const parts: string[] = [];
|
||||||
|
if (contextChanged || hostChanged || ipChanged) {
|
||||||
|
const hostLabel = next.host?.trim() || "Unknown";
|
||||||
|
const ipLabel = next.ip?.trim();
|
||||||
|
parts.push(
|
||||||
|
`Node: ${hostLabel}${ipLabel ? ` (${ipLabel})` : ""}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (versionChanged) {
|
||||||
|
parts.push(`app ${next.version?.trim() || "unknown"}`);
|
||||||
|
}
|
||||||
|
if (modeChanged) {
|
||||||
|
parts.push(`mode ${next.mode?.trim() || "unknown"}`);
|
||||||
|
}
|
||||||
|
if (reasonChanged) {
|
||||||
|
parts.push(`reason ${reasonValue?.trim() || "event"}`);
|
||||||
|
}
|
||||||
|
const deltaText = parts.join(" · ");
|
||||||
|
if (deltaText) {
|
||||||
|
enqueueSystemEvent(deltaText, {
|
||||||
|
contextKey: presenceUpdate.key,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
enqueueSystemEvent(text);
|
||||||
}
|
}
|
||||||
presenceVersion += 1;
|
presenceVersion += 1;
|
||||||
broadcast(
|
broadcast(
|
||||||
|
|||||||
@@ -7,10 +7,30 @@ type SystemEvent = { text: string; ts: number };
|
|||||||
const MAX_EVENTS = 20;
|
const MAX_EVENTS = 20;
|
||||||
const queue: SystemEvent[] = [];
|
const queue: SystemEvent[] = [];
|
||||||
let lastText: string | null = null;
|
let lastText: string | null = null;
|
||||||
|
let lastContextKey: string | null = null;
|
||||||
|
|
||||||
export function enqueueSystemEvent(text: string) {
|
type SystemEventOptions = {
|
||||||
|
contextKey?: string | null;
|
||||||
|
};
|
||||||
|
|
||||||
|
function normalizeContextKey(key?: string | null): string | null {
|
||||||
|
if (!key) return null;
|
||||||
|
const trimmed = key.trim();
|
||||||
|
if (!trimmed) return null;
|
||||||
|
return trimmed.toLowerCase();
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isSystemEventContextChanged(
|
||||||
|
contextKey?: string | null,
|
||||||
|
): boolean {
|
||||||
|
const normalized = normalizeContextKey(contextKey);
|
||||||
|
return normalized !== lastContextKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function enqueueSystemEvent(text: string, options?: SystemEventOptions) {
|
||||||
const cleaned = text.trim();
|
const cleaned = text.trim();
|
||||||
if (!cleaned) return;
|
if (!cleaned) return;
|
||||||
|
lastContextKey = normalizeContextKey(options?.contextKey);
|
||||||
if (lastText === cleaned) return; // skip consecutive duplicates
|
if (lastText === cleaned) return; // skip consecutive duplicates
|
||||||
lastText = cleaned;
|
lastText = cleaned;
|
||||||
queue.push({ text: cleaned, ts: Date.now() });
|
queue.push({ text: cleaned, ts: Date.now() });
|
||||||
@@ -21,6 +41,7 @@ export function drainSystemEvents(): string[] {
|
|||||||
const out = queue.map((e) => e.text);
|
const out = queue.map((e) => e.text);
|
||||||
queue.length = 0;
|
queue.length = 0;
|
||||||
lastText = null;
|
lastText = null;
|
||||||
|
lastContextKey = null;
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,14 @@ export type SystemPresence = {
|
|||||||
ts: number;
|
ts: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export type SystemPresenceUpdate = {
|
||||||
|
key: string;
|
||||||
|
previous?: SystemPresence;
|
||||||
|
next: SystemPresence;
|
||||||
|
changes: Partial<SystemPresence>;
|
||||||
|
changedKeys: (keyof SystemPresence)[];
|
||||||
|
};
|
||||||
|
|
||||||
const entries = new Map<string, SystemPresence>();
|
const entries = new Map<string, SystemPresence>();
|
||||||
const TTL_MS = 5 * 60 * 1000; // 5 minutes
|
const TTL_MS = 5 * 60 * 1000; // 5 minutes
|
||||||
const MAX_ENTRIES = 200;
|
const MAX_ENTRIES = 200;
|
||||||
@@ -154,7 +162,9 @@ type SystemPresencePayload = {
|
|||||||
tags?: string[];
|
tags?: string[];
|
||||||
};
|
};
|
||||||
|
|
||||||
export function updateSystemPresence(payload: SystemPresencePayload) {
|
export function updateSystemPresence(
|
||||||
|
payload: SystemPresencePayload,
|
||||||
|
): SystemPresenceUpdate {
|
||||||
ensureSelfPresence();
|
ensureSelfPresence();
|
||||||
const parsed = parsePresence(payload.text);
|
const parsed = parsePresence(payload.text);
|
||||||
const key =
|
const key =
|
||||||
@@ -164,6 +174,7 @@ export function updateSystemPresence(payload: SystemPresencePayload) {
|
|||||||
parsed.ip ||
|
parsed.ip ||
|
||||||
parsed.text.slice(0, 64) ||
|
parsed.text.slice(0, 64) ||
|
||||||
os.hostname().toLowerCase();
|
os.hostname().toLowerCase();
|
||||||
|
const hadExisting = entries.has(key);
|
||||||
const existing = entries.get(key) ?? ({} as SystemPresence);
|
const existing = entries.get(key) ?? ({} as SystemPresence);
|
||||||
const merged: SystemPresence = {
|
const merged: SystemPresence = {
|
||||||
...existing,
|
...existing,
|
||||||
@@ -185,6 +196,25 @@ export function updateSystemPresence(payload: SystemPresencePayload) {
|
|||||||
ts: Date.now(),
|
ts: Date.now(),
|
||||||
};
|
};
|
||||||
entries.set(key, merged);
|
entries.set(key, merged);
|
||||||
|
const trackKeys = ["host", "ip", "version", "mode", "reason"] as const;
|
||||||
|
type TrackKey = (typeof trackKeys)[number];
|
||||||
|
const changes: Partial<Pick<SystemPresence, TrackKey>> = {};
|
||||||
|
const changedKeys: TrackKey[] = [];
|
||||||
|
for (const k of trackKeys) {
|
||||||
|
const prev = existing[k];
|
||||||
|
const next = merged[k];
|
||||||
|
if (prev !== next) {
|
||||||
|
changes[k] = next;
|
||||||
|
changedKeys.push(k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
key,
|
||||||
|
previous: hadExisting ? existing : undefined,
|
||||||
|
next: merged,
|
||||||
|
changes,
|
||||||
|
changedKeys,
|
||||||
|
} satisfies SystemPresenceUpdate;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function upsertPresence(key: string, presence: Partial<SystemPresence>) {
|
export function upsertPresence(key: string, presence: Partial<SystemPresence>) {
|
||||||
|
|||||||
Reference in New Issue
Block a user