465 lines
12 KiB
TypeScript
465 lines
12 KiB
TypeScript
/**
|
|
* Comprehensive metrics system for Nostr bus observability.
|
|
* Provides clear insight into what's happening with events, relays, and operations.
|
|
*/
|
|
|
|
// ============================================================================
|
|
// Metric Types
|
|
// ============================================================================
|
|
|
|
export type EventMetricName =
|
|
| "event.received"
|
|
| "event.processed"
|
|
| "event.duplicate"
|
|
| "event.rejected.invalid_shape"
|
|
| "event.rejected.wrong_kind"
|
|
| "event.rejected.stale"
|
|
| "event.rejected.future"
|
|
| "event.rejected.rate_limited"
|
|
| "event.rejected.invalid_signature"
|
|
| "event.rejected.oversized_ciphertext"
|
|
| "event.rejected.oversized_plaintext"
|
|
| "event.rejected.decrypt_failed"
|
|
| "event.rejected.self_message";
|
|
|
|
export type RelayMetricName =
|
|
| "relay.connect"
|
|
| "relay.disconnect"
|
|
| "relay.reconnect"
|
|
| "relay.error"
|
|
| "relay.message.event"
|
|
| "relay.message.eose"
|
|
| "relay.message.closed"
|
|
| "relay.message.notice"
|
|
| "relay.message.ok"
|
|
| "relay.message.auth"
|
|
| "relay.circuit_breaker.open"
|
|
| "relay.circuit_breaker.close"
|
|
| "relay.circuit_breaker.half_open";
|
|
|
|
export type RateLimitMetricName = "rate_limit.per_sender" | "rate_limit.global";
|
|
|
|
export type DecryptMetricName = "decrypt.success" | "decrypt.failure";
|
|
|
|
export type MemoryMetricName =
|
|
| "memory.seen_tracker_size"
|
|
| "memory.rate_limiter_entries";
|
|
|
|
export type MetricName =
|
|
| EventMetricName
|
|
| RelayMetricName
|
|
| RateLimitMetricName
|
|
| DecryptMetricName
|
|
| MemoryMetricName;
|
|
|
|
// ============================================================================
|
|
// Metric Event
|
|
// ============================================================================
|
|
|
|
export interface MetricEvent {
|
|
/** Metric name (e.g., "event.received", "relay.connect") */
|
|
name: MetricName;
|
|
/** Metric value (usually 1 for counters, or a measured value) */
|
|
value: number;
|
|
/** Unix timestamp in milliseconds */
|
|
timestamp: number;
|
|
/** Optional labels for additional context */
|
|
labels?: Record<string, string | number>;
|
|
}
|
|
|
|
export type OnMetricCallback = (event: MetricEvent) => void;
|
|
|
|
// ============================================================================
|
|
// Metrics Snapshot (for getMetrics())
|
|
// ============================================================================
|
|
|
|
export interface MetricsSnapshot {
|
|
/** Total events received (before any filtering) */
|
|
eventsReceived: number;
|
|
/** Events successfully processed */
|
|
eventsProcessed: number;
|
|
/** Duplicate events skipped */
|
|
eventsDuplicate: number;
|
|
/** Events rejected by reason */
|
|
eventsRejected: {
|
|
invalidShape: number;
|
|
wrongKind: number;
|
|
stale: number;
|
|
future: number;
|
|
rateLimited: number;
|
|
invalidSignature: number;
|
|
oversizedCiphertext: number;
|
|
oversizedPlaintext: number;
|
|
decryptFailed: number;
|
|
selfMessage: number;
|
|
};
|
|
|
|
/** Relay stats by URL */
|
|
relays: Record<
|
|
string,
|
|
{
|
|
connects: number;
|
|
disconnects: number;
|
|
reconnects: number;
|
|
errors: number;
|
|
messagesReceived: {
|
|
event: number;
|
|
eose: number;
|
|
closed: number;
|
|
notice: number;
|
|
ok: number;
|
|
auth: number;
|
|
};
|
|
circuitBreakerState: "closed" | "open" | "half_open";
|
|
circuitBreakerOpens: number;
|
|
circuitBreakerCloses: number;
|
|
}
|
|
>;
|
|
|
|
/** Rate limiting stats */
|
|
rateLimiting: {
|
|
perSenderHits: number;
|
|
globalHits: number;
|
|
};
|
|
|
|
/** Decrypt stats */
|
|
decrypt: {
|
|
success: number;
|
|
failure: number;
|
|
};
|
|
|
|
/** Memory/capacity stats */
|
|
memory: {
|
|
seenTrackerSize: number;
|
|
rateLimiterEntries: number;
|
|
};
|
|
|
|
/** Snapshot timestamp */
|
|
snapshotAt: number;
|
|
}
|
|
|
|
// ============================================================================
|
|
// Metrics Collector
|
|
// ============================================================================
|
|
|
|
export interface NostrMetrics {
|
|
/** Emit a metric event */
|
|
emit: (
|
|
name: MetricName,
|
|
value?: number,
|
|
labels?: Record<string, string | number>
|
|
) => void;
|
|
|
|
/** Get current metrics snapshot */
|
|
getSnapshot: () => MetricsSnapshot;
|
|
|
|
/** Reset all metrics to zero */
|
|
reset: () => void;
|
|
}
|
|
|
|
/**
|
|
* Create a metrics collector instance.
|
|
* Optionally pass an onMetric callback to receive real-time metric events.
|
|
*/
|
|
export function createMetrics(onMetric?: OnMetricCallback): NostrMetrics {
|
|
// Counters
|
|
let eventsReceived = 0;
|
|
let eventsProcessed = 0;
|
|
let eventsDuplicate = 0;
|
|
const eventsRejected = {
|
|
invalidShape: 0,
|
|
wrongKind: 0,
|
|
stale: 0,
|
|
future: 0,
|
|
rateLimited: 0,
|
|
invalidSignature: 0,
|
|
oversizedCiphertext: 0,
|
|
oversizedPlaintext: 0,
|
|
decryptFailed: 0,
|
|
selfMessage: 0,
|
|
};
|
|
|
|
// Per-relay stats
|
|
const relays = new Map<
|
|
string,
|
|
{
|
|
connects: number;
|
|
disconnects: number;
|
|
reconnects: number;
|
|
errors: number;
|
|
messagesReceived: {
|
|
event: number;
|
|
eose: number;
|
|
closed: number;
|
|
notice: number;
|
|
ok: number;
|
|
auth: number;
|
|
};
|
|
circuitBreakerState: "closed" | "open" | "half_open";
|
|
circuitBreakerOpens: number;
|
|
circuitBreakerCloses: number;
|
|
}
|
|
>();
|
|
|
|
// Rate limiting stats
|
|
const rateLimiting = {
|
|
perSenderHits: 0,
|
|
globalHits: 0,
|
|
};
|
|
|
|
// Decrypt stats
|
|
const decrypt = {
|
|
success: 0,
|
|
failure: 0,
|
|
};
|
|
|
|
// Memory stats (updated via gauge-style metrics)
|
|
const memory = {
|
|
seenTrackerSize: 0,
|
|
rateLimiterEntries: 0,
|
|
};
|
|
|
|
function getOrCreateRelay(url: string) {
|
|
let relay = relays.get(url);
|
|
if (!relay) {
|
|
relay = {
|
|
connects: 0,
|
|
disconnects: 0,
|
|
reconnects: 0,
|
|
errors: 0,
|
|
messagesReceived: {
|
|
event: 0,
|
|
eose: 0,
|
|
closed: 0,
|
|
notice: 0,
|
|
ok: 0,
|
|
auth: 0,
|
|
},
|
|
circuitBreakerState: "closed",
|
|
circuitBreakerOpens: 0,
|
|
circuitBreakerCloses: 0,
|
|
};
|
|
relays.set(url, relay);
|
|
}
|
|
return relay;
|
|
}
|
|
|
|
function emit(
|
|
name: MetricName,
|
|
value: number = 1,
|
|
labels?: Record<string, string | number>
|
|
): void {
|
|
// Fire callback if provided
|
|
if (onMetric) {
|
|
onMetric({
|
|
name,
|
|
value,
|
|
timestamp: Date.now(),
|
|
labels,
|
|
});
|
|
}
|
|
|
|
// Update internal counters
|
|
const relayUrl = labels?.relay as string | undefined;
|
|
|
|
switch (name) {
|
|
// Event metrics
|
|
case "event.received":
|
|
eventsReceived += value;
|
|
break;
|
|
case "event.processed":
|
|
eventsProcessed += value;
|
|
break;
|
|
case "event.duplicate":
|
|
eventsDuplicate += value;
|
|
break;
|
|
case "event.rejected.invalid_shape":
|
|
eventsRejected.invalidShape += value;
|
|
break;
|
|
case "event.rejected.wrong_kind":
|
|
eventsRejected.wrongKind += value;
|
|
break;
|
|
case "event.rejected.stale":
|
|
eventsRejected.stale += value;
|
|
break;
|
|
case "event.rejected.future":
|
|
eventsRejected.future += value;
|
|
break;
|
|
case "event.rejected.rate_limited":
|
|
eventsRejected.rateLimited += value;
|
|
break;
|
|
case "event.rejected.invalid_signature":
|
|
eventsRejected.invalidSignature += value;
|
|
break;
|
|
case "event.rejected.oversized_ciphertext":
|
|
eventsRejected.oversizedCiphertext += value;
|
|
break;
|
|
case "event.rejected.oversized_plaintext":
|
|
eventsRejected.oversizedPlaintext += value;
|
|
break;
|
|
case "event.rejected.decrypt_failed":
|
|
eventsRejected.decryptFailed += value;
|
|
break;
|
|
case "event.rejected.self_message":
|
|
eventsRejected.selfMessage += value;
|
|
break;
|
|
|
|
// Relay metrics
|
|
case "relay.connect":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).connects += value;
|
|
break;
|
|
case "relay.disconnect":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).disconnects += value;
|
|
break;
|
|
case "relay.reconnect":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).reconnects += value;
|
|
break;
|
|
case "relay.error":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).errors += value;
|
|
break;
|
|
case "relay.message.event":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).messagesReceived.event += value;
|
|
break;
|
|
case "relay.message.eose":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).messagesReceived.eose += value;
|
|
break;
|
|
case "relay.message.closed":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).messagesReceived.closed += value;
|
|
break;
|
|
case "relay.message.notice":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).messagesReceived.notice += value;
|
|
break;
|
|
case "relay.message.ok":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).messagesReceived.ok += value;
|
|
break;
|
|
case "relay.message.auth":
|
|
if (relayUrl) getOrCreateRelay(relayUrl).messagesReceived.auth += value;
|
|
break;
|
|
case "relay.circuit_breaker.open":
|
|
if (relayUrl) {
|
|
const r = getOrCreateRelay(relayUrl);
|
|
r.circuitBreakerState = "open";
|
|
r.circuitBreakerOpens += value;
|
|
}
|
|
break;
|
|
case "relay.circuit_breaker.close":
|
|
if (relayUrl) {
|
|
const r = getOrCreateRelay(relayUrl);
|
|
r.circuitBreakerState = "closed";
|
|
r.circuitBreakerCloses += value;
|
|
}
|
|
break;
|
|
case "relay.circuit_breaker.half_open":
|
|
if (relayUrl) {
|
|
getOrCreateRelay(relayUrl).circuitBreakerState = "half_open";
|
|
}
|
|
break;
|
|
|
|
// Rate limiting
|
|
case "rate_limit.per_sender":
|
|
rateLimiting.perSenderHits += value;
|
|
break;
|
|
case "rate_limit.global":
|
|
rateLimiting.globalHits += value;
|
|
break;
|
|
|
|
// Decrypt
|
|
case "decrypt.success":
|
|
decrypt.success += value;
|
|
break;
|
|
case "decrypt.failure":
|
|
decrypt.failure += value;
|
|
break;
|
|
|
|
// Memory (gauge-style - value replaces, not adds)
|
|
case "memory.seen_tracker_size":
|
|
memory.seenTrackerSize = value;
|
|
break;
|
|
case "memory.rate_limiter_entries":
|
|
memory.rateLimiterEntries = value;
|
|
break;
|
|
}
|
|
}
|
|
|
|
function getSnapshot(): MetricsSnapshot {
|
|
// Convert relay map to object
|
|
const relaysObj: MetricsSnapshot["relays"] = {};
|
|
for (const [url, stats] of relays) {
|
|
relaysObj[url] = { ...stats, messagesReceived: { ...stats.messagesReceived } };
|
|
}
|
|
|
|
return {
|
|
eventsReceived,
|
|
eventsProcessed,
|
|
eventsDuplicate,
|
|
eventsRejected: { ...eventsRejected },
|
|
relays: relaysObj,
|
|
rateLimiting: { ...rateLimiting },
|
|
decrypt: { ...decrypt },
|
|
memory: { ...memory },
|
|
snapshotAt: Date.now(),
|
|
};
|
|
}
|
|
|
|
function reset(): void {
|
|
eventsReceived = 0;
|
|
eventsProcessed = 0;
|
|
eventsDuplicate = 0;
|
|
Object.assign(eventsRejected, {
|
|
invalidShape: 0,
|
|
wrongKind: 0,
|
|
stale: 0,
|
|
future: 0,
|
|
rateLimited: 0,
|
|
invalidSignature: 0,
|
|
oversizedCiphertext: 0,
|
|
oversizedPlaintext: 0,
|
|
decryptFailed: 0,
|
|
selfMessage: 0,
|
|
});
|
|
relays.clear();
|
|
rateLimiting.perSenderHits = 0;
|
|
rateLimiting.globalHits = 0;
|
|
decrypt.success = 0;
|
|
decrypt.failure = 0;
|
|
memory.seenTrackerSize = 0;
|
|
memory.rateLimiterEntries = 0;
|
|
}
|
|
|
|
return { emit, getSnapshot, reset };
|
|
}
|
|
|
|
/**
|
|
* Create a no-op metrics instance (for when metrics are disabled).
|
|
*/
|
|
export function createNoopMetrics(): NostrMetrics {
|
|
const emptySnapshot: MetricsSnapshot = {
|
|
eventsReceived: 0,
|
|
eventsProcessed: 0,
|
|
eventsDuplicate: 0,
|
|
eventsRejected: {
|
|
invalidShape: 0,
|
|
wrongKind: 0,
|
|
stale: 0,
|
|
future: 0,
|
|
rateLimited: 0,
|
|
invalidSignature: 0,
|
|
oversizedCiphertext: 0,
|
|
oversizedPlaintext: 0,
|
|
decryptFailed: 0,
|
|
selfMessage: 0,
|
|
},
|
|
relays: {},
|
|
rateLimiting: { perSenderHits: 0, globalHits: 0 },
|
|
decrypt: { success: 0, failure: 0 },
|
|
memory: { seenTrackerSize: 0, rateLimiterEntries: 0 },
|
|
snapshotAt: 0,
|
|
};
|
|
|
|
return {
|
|
emit: () => {},
|
|
getSnapshot: () => ({ ...emptySnapshot, snapshotAt: Date.now() }),
|
|
reset: () => {},
|
|
};
|
|
}
|