feat: add Nostr channel plugin and onboarding install defaults
Co-authored-by: joelklabo <joelklabo@users.noreply.github.com>
This commit is contained in:
741
extensions/nostr/src/nostr-bus.ts
Normal file
741
extensions/nostr/src/nostr-bus.ts
Normal file
@@ -0,0 +1,741 @@
|
||||
import {
|
||||
SimplePool,
|
||||
finalizeEvent,
|
||||
getPublicKey,
|
||||
verifyEvent,
|
||||
nip19,
|
||||
type Event,
|
||||
} from "nostr-tools";
|
||||
import { decrypt, encrypt } from "nostr-tools/nip04";
|
||||
|
||||
import {
|
||||
readNostrBusState,
|
||||
writeNostrBusState,
|
||||
computeSinceTimestamp,
|
||||
readNostrProfileState,
|
||||
writeNostrProfileState,
|
||||
} from "./nostr-state-store.js";
|
||||
import {
|
||||
publishProfile as publishProfileFn,
|
||||
type ProfilePublishResult,
|
||||
} from "./nostr-profile.js";
|
||||
import type { NostrProfile } from "./config-schema.js";
|
||||
import { createSeenTracker, type SeenTracker } from "./seen-tracker.js";
|
||||
import {
|
||||
createMetrics,
|
||||
createNoopMetrics,
|
||||
type NostrMetrics,
|
||||
type MetricsSnapshot,
|
||||
type MetricEvent,
|
||||
} from "./metrics.js";
|
||||
|
||||
export const DEFAULT_RELAYS = ["wss://relay.damus.io", "wss://nos.lol"];
|
||||
|
||||
// ============================================================================
|
||||
// Constants
|
||||
// ============================================================================
|
||||
|
||||
const STARTUP_LOOKBACK_SEC = 120; // tolerate relay lag / clock skew
|
||||
const MAX_PERSISTED_EVENT_IDS = 5000;
|
||||
const STATE_PERSIST_DEBOUNCE_MS = 5000; // Debounce state writes
|
||||
|
||||
// Reconnect configuration (exponential backoff with jitter)
|
||||
const RECONNECT_BASE_MS = 1000; // 1 second base
|
||||
const RECONNECT_MAX_MS = 60000; // 60 seconds max
|
||||
const RECONNECT_JITTER = 0.3; // ±30% jitter
|
||||
|
||||
// Circuit breaker configuration
|
||||
const CIRCUIT_BREAKER_THRESHOLD = 5; // failures before opening
|
||||
const CIRCUIT_BREAKER_RESET_MS = 30000; // 30 seconds before half-open
|
||||
|
||||
// Health tracker configuration
|
||||
const HEALTH_WINDOW_MS = 60000; // 1 minute window for health stats
|
||||
|
||||
// ============================================================================
|
||||
// Types
|
||||
// ============================================================================
|
||||
|
||||
export interface NostrBusOptions {
|
||||
/** Private key in hex or nsec format */
|
||||
privateKey: string;
|
||||
/** WebSocket relay URLs (defaults to damus + nos.lol) */
|
||||
relays?: string[];
|
||||
/** Account ID for state persistence (optional, defaults to pubkey prefix) */
|
||||
accountId?: string;
|
||||
/** Called when a DM is received */
|
||||
onMessage: (
|
||||
pubkey: string,
|
||||
text: string,
|
||||
reply: (text: string) => Promise<void>
|
||||
) => Promise<void>;
|
||||
/** Called on errors (optional) */
|
||||
onError?: (error: Error, context: string) => void;
|
||||
/** Called on connection status changes (optional) */
|
||||
onConnect?: (relay: string) => void;
|
||||
/** Called on disconnection (optional) */
|
||||
onDisconnect?: (relay: string) => void;
|
||||
/** Called on EOSE (end of stored events) for initial sync (optional) */
|
||||
onEose?: (relay: string) => void;
|
||||
/** Called on each metric event (optional) */
|
||||
onMetric?: (event: MetricEvent) => void;
|
||||
/** Maximum entries in seen tracker (default: 100,000) */
|
||||
maxSeenEntries?: number;
|
||||
/** Seen tracker TTL in ms (default: 1 hour) */
|
||||
seenTtlMs?: number;
|
||||
}
|
||||
|
||||
export interface NostrBusHandle {
|
||||
/** Stop the bus and close connections */
|
||||
close: () => void;
|
||||
/** Get the bot's public key */
|
||||
publicKey: string;
|
||||
/** Send a DM to a pubkey */
|
||||
sendDm: (toPubkey: string, text: string) => Promise<void>;
|
||||
/** Get current metrics snapshot */
|
||||
getMetrics: () => MetricsSnapshot;
|
||||
/** Publish a profile (kind:0) to all relays */
|
||||
publishProfile: (profile: NostrProfile) => Promise<ProfilePublishResult>;
|
||||
/** Get the last profile publish state */
|
||||
getProfileState: () => Promise<{
|
||||
lastPublishedAt: number | null;
|
||||
lastPublishedEventId: string | null;
|
||||
lastPublishResults: Record<string, "ok" | "failed" | "timeout"> | null;
|
||||
}>;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Circuit Breaker
|
||||
// ============================================================================
|
||||
|
||||
interface CircuitBreakerState {
|
||||
state: "closed" | "open" | "half_open";
|
||||
failures: number;
|
||||
lastFailure: number;
|
||||
lastSuccess: number;
|
||||
}
|
||||
|
||||
interface CircuitBreaker {
|
||||
/** Check if requests should be allowed */
|
||||
canAttempt: () => boolean;
|
||||
/** Record a success */
|
||||
recordSuccess: () => void;
|
||||
/** Record a failure */
|
||||
recordFailure: () => void;
|
||||
/** Get current state */
|
||||
getState: () => CircuitBreakerState["state"];
|
||||
}
|
||||
|
||||
function createCircuitBreaker(
|
||||
relay: string,
|
||||
metrics: NostrMetrics,
|
||||
threshold: number = CIRCUIT_BREAKER_THRESHOLD,
|
||||
resetMs: number = CIRCUIT_BREAKER_RESET_MS
|
||||
): CircuitBreaker {
|
||||
const state: CircuitBreakerState = {
|
||||
state: "closed",
|
||||
failures: 0,
|
||||
lastFailure: 0,
|
||||
lastSuccess: Date.now(),
|
||||
};
|
||||
|
||||
return {
|
||||
canAttempt(): boolean {
|
||||
if (state.state === "closed") return true;
|
||||
|
||||
if (state.state === "open") {
|
||||
// Check if enough time has passed to try half-open
|
||||
if (Date.now() - state.lastFailure >= resetMs) {
|
||||
state.state = "half_open";
|
||||
metrics.emit("relay.circuit_breaker.half_open", 1, { relay });
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// half_open: allow one attempt
|
||||
return true;
|
||||
},
|
||||
|
||||
recordSuccess(): void {
|
||||
if (state.state === "half_open") {
|
||||
state.state = "closed";
|
||||
state.failures = 0;
|
||||
metrics.emit("relay.circuit_breaker.close", 1, { relay });
|
||||
} else if (state.state === "closed") {
|
||||
state.failures = 0;
|
||||
}
|
||||
state.lastSuccess = Date.now();
|
||||
},
|
||||
|
||||
recordFailure(): void {
|
||||
state.failures++;
|
||||
state.lastFailure = Date.now();
|
||||
|
||||
if (state.state === "half_open") {
|
||||
state.state = "open";
|
||||
metrics.emit("relay.circuit_breaker.open", 1, { relay });
|
||||
} else if (state.state === "closed" && state.failures >= threshold) {
|
||||
state.state = "open";
|
||||
metrics.emit("relay.circuit_breaker.open", 1, { relay });
|
||||
}
|
||||
},
|
||||
|
||||
getState(): CircuitBreakerState["state"] {
|
||||
return state.state;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Relay Health Tracker
|
||||
// ============================================================================
|
||||
|
||||
interface RelayHealthStats {
|
||||
successCount: number;
|
||||
failureCount: number;
|
||||
latencySum: number;
|
||||
latencyCount: number;
|
||||
lastSuccess: number;
|
||||
lastFailure: number;
|
||||
}
|
||||
|
||||
interface RelayHealthTracker {
|
||||
/** Record a successful operation */
|
||||
recordSuccess: (relay: string, latencyMs: number) => void;
|
||||
/** Record a failed operation */
|
||||
recordFailure: (relay: string) => void;
|
||||
/** Get health score (0-1, higher is better) */
|
||||
getScore: (relay: string) => number;
|
||||
/** Get relays sorted by health (best first) */
|
||||
getSortedRelays: (relays: string[]) => string[];
|
||||
}
|
||||
|
||||
function createRelayHealthTracker(): RelayHealthTracker {
|
||||
const stats = new Map<string, RelayHealthStats>();
|
||||
|
||||
function getOrCreate(relay: string): RelayHealthStats {
|
||||
let s = stats.get(relay);
|
||||
if (!s) {
|
||||
s = {
|
||||
successCount: 0,
|
||||
failureCount: 0,
|
||||
latencySum: 0,
|
||||
latencyCount: 0,
|
||||
lastSuccess: 0,
|
||||
lastFailure: 0,
|
||||
};
|
||||
stats.set(relay, s);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
return {
|
||||
recordSuccess(relay: string, latencyMs: number): void {
|
||||
const s = getOrCreate(relay);
|
||||
s.successCount++;
|
||||
s.latencySum += latencyMs;
|
||||
s.latencyCount++;
|
||||
s.lastSuccess = Date.now();
|
||||
},
|
||||
|
||||
recordFailure(relay: string): void {
|
||||
const s = getOrCreate(relay);
|
||||
s.failureCount++;
|
||||
s.lastFailure = Date.now();
|
||||
},
|
||||
|
||||
getScore(relay: string): number {
|
||||
const s = stats.get(relay);
|
||||
if (!s) return 0.5; // Unknown relay gets neutral score
|
||||
|
||||
const total = s.successCount + s.failureCount;
|
||||
if (total === 0) return 0.5;
|
||||
|
||||
// Success rate (0-1)
|
||||
const successRate = s.successCount / total;
|
||||
|
||||
// Recency bonus (prefer recently successful relays)
|
||||
const now = Date.now();
|
||||
const recencyBonus =
|
||||
s.lastSuccess > s.lastFailure
|
||||
? Math.max(0, 1 - (now - s.lastSuccess) / HEALTH_WINDOW_MS) * 0.2
|
||||
: 0;
|
||||
|
||||
// Latency penalty (lower is better)
|
||||
const avgLatency =
|
||||
s.latencyCount > 0 ? s.latencySum / s.latencyCount : 1000;
|
||||
const latencyPenalty = Math.min(0.2, avgLatency / 10000);
|
||||
|
||||
return Math.max(0, Math.min(1, successRate + recencyBonus - latencyPenalty));
|
||||
},
|
||||
|
||||
getSortedRelays(relays: string[]): string[] {
|
||||
return [...relays].sort((a, b) => this.getScore(b) - this.getScore(a));
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Reconnect with Exponential Backoff + Jitter
|
||||
// ============================================================================
|
||||
|
||||
function computeReconnectDelay(attempt: number): number {
|
||||
// Exponential backoff: base * 2^attempt
|
||||
const exponential = RECONNECT_BASE_MS * Math.pow(2, attempt);
|
||||
const capped = Math.min(exponential, RECONNECT_MAX_MS);
|
||||
|
||||
// Add jitter: ±JITTER%
|
||||
const jitter = capped * RECONNECT_JITTER * (Math.random() * 2 - 1);
|
||||
return Math.max(RECONNECT_BASE_MS, capped + jitter);
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Key Validation
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Validate and normalize a private key (accepts hex or nsec format)
|
||||
*/
|
||||
export function validatePrivateKey(key: string): Uint8Array {
|
||||
const trimmed = key.trim();
|
||||
|
||||
// Handle nsec (bech32) format
|
||||
if (trimmed.startsWith("nsec1")) {
|
||||
const decoded = nip19.decode(trimmed);
|
||||
if (decoded.type !== "nsec") {
|
||||
throw new Error("Invalid nsec key: wrong type");
|
||||
}
|
||||
return decoded.data;
|
||||
}
|
||||
|
||||
// Handle hex format
|
||||
if (!/^[0-9a-fA-F]{64}$/.test(trimmed)) {
|
||||
throw new Error(
|
||||
"Private key must be 64 hex characters or nsec bech32 format"
|
||||
);
|
||||
}
|
||||
|
||||
// Convert hex string to Uint8Array
|
||||
const bytes = new Uint8Array(32);
|
||||
for (let i = 0; i < 32; i++) {
|
||||
bytes[i] = parseInt(trimmed.slice(i * 2, i * 2 + 2), 16);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get public key from private key (hex or nsec format)
|
||||
*/
|
||||
export function getPublicKeyFromPrivate(privateKey: string): string {
|
||||
const sk = validatePrivateKey(privateKey);
|
||||
return getPublicKey(sk);
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Main Bus
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Start the Nostr DM bus - subscribes to NIP-04 encrypted DMs
|
||||
*/
|
||||
export async function startNostrBus(
|
||||
options: NostrBusOptions
|
||||
): Promise<NostrBusHandle> {
|
||||
const {
|
||||
privateKey,
|
||||
relays = DEFAULT_RELAYS,
|
||||
onMessage,
|
||||
onError,
|
||||
onEose,
|
||||
onMetric,
|
||||
maxSeenEntries = 100_000,
|
||||
seenTtlMs = 60 * 60 * 1000,
|
||||
} = options;
|
||||
|
||||
const sk = validatePrivateKey(privateKey);
|
||||
const pk = getPublicKey(sk);
|
||||
const pool = new SimplePool();
|
||||
const accountId = options.accountId ?? pk.slice(0, 16);
|
||||
const gatewayStartedAt = Math.floor(Date.now() / 1000);
|
||||
|
||||
// Initialize metrics
|
||||
const metrics = onMetric ? createMetrics(onMetric) : createNoopMetrics();
|
||||
|
||||
// Initialize seen tracker with LRU
|
||||
const seen: SeenTracker = createSeenTracker({
|
||||
maxEntries: maxSeenEntries,
|
||||
ttlMs: seenTtlMs,
|
||||
});
|
||||
|
||||
// Initialize circuit breakers and health tracker
|
||||
const circuitBreakers = new Map<string, CircuitBreaker>();
|
||||
const healthTracker = createRelayHealthTracker();
|
||||
|
||||
for (const relay of relays) {
|
||||
circuitBreakers.set(relay, createCircuitBreaker(relay, metrics));
|
||||
}
|
||||
|
||||
// Read persisted state and compute `since` timestamp (with small overlap)
|
||||
const state = await readNostrBusState({ accountId });
|
||||
const baseSince = computeSinceTimestamp(state, gatewayStartedAt);
|
||||
const since = Math.max(0, baseSince - STARTUP_LOOKBACK_SEC);
|
||||
|
||||
// Seed in-memory dedupe with recent IDs from disk (prevents restart replay)
|
||||
if (state?.recentEventIds?.length) {
|
||||
seen.seed(state.recentEventIds);
|
||||
}
|
||||
|
||||
// Persist startup timestamp
|
||||
await writeNostrBusState({
|
||||
accountId,
|
||||
lastProcessedAt: state?.lastProcessedAt ?? gatewayStartedAt,
|
||||
gatewayStartedAt,
|
||||
recentEventIds: state?.recentEventIds ?? [],
|
||||
});
|
||||
|
||||
// Debounced state persistence
|
||||
let pendingWrite: ReturnType<typeof setTimeout> | undefined;
|
||||
let lastProcessedAt = state?.lastProcessedAt ?? gatewayStartedAt;
|
||||
let recentEventIds = (state?.recentEventIds ?? []).slice(
|
||||
-MAX_PERSISTED_EVENT_IDS
|
||||
);
|
||||
|
||||
function scheduleStatePersist(eventCreatedAt: number, eventId: string): void {
|
||||
lastProcessedAt = Math.max(lastProcessedAt, eventCreatedAt);
|
||||
recentEventIds.push(eventId);
|
||||
if (recentEventIds.length > MAX_PERSISTED_EVENT_IDS) {
|
||||
recentEventIds = recentEventIds.slice(-MAX_PERSISTED_EVENT_IDS);
|
||||
}
|
||||
|
||||
if (pendingWrite) clearTimeout(pendingWrite);
|
||||
pendingWrite = setTimeout(() => {
|
||||
writeNostrBusState({
|
||||
accountId,
|
||||
lastProcessedAt,
|
||||
gatewayStartedAt,
|
||||
recentEventIds,
|
||||
}).catch((err) => onError?.(err as Error, "persist state"));
|
||||
}, STATE_PERSIST_DEBOUNCE_MS);
|
||||
}
|
||||
|
||||
const inflight = new Set<string>();
|
||||
|
||||
// Event handler
|
||||
async function handleEvent(event: Event): Promise<void> {
|
||||
try {
|
||||
metrics.emit("event.received");
|
||||
|
||||
// Fast dedupe check (handles relay reconnections)
|
||||
if (seen.peek(event.id) || inflight.has(event.id)) {
|
||||
metrics.emit("event.duplicate");
|
||||
return;
|
||||
}
|
||||
inflight.add(event.id);
|
||||
|
||||
// Self-message loop prevention: skip our own messages
|
||||
if (event.pubkey === pk) {
|
||||
metrics.emit("event.rejected.self_message");
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip events older than our `since` (relay may ignore filter)
|
||||
if (event.created_at < since) {
|
||||
metrics.emit("event.rejected.stale");
|
||||
return;
|
||||
}
|
||||
|
||||
// Fast p-tag check BEFORE crypto (no allocation, cheaper)
|
||||
let targetsUs = false;
|
||||
for (const t of event.tags) {
|
||||
if (t[0] === "p" && t[1] === pk) {
|
||||
targetsUs = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!targetsUs) {
|
||||
metrics.emit("event.rejected.wrong_kind");
|
||||
return;
|
||||
}
|
||||
|
||||
// Verify signature (must pass before we trust the event)
|
||||
if (!verifyEvent(event)) {
|
||||
metrics.emit("event.rejected.invalid_signature");
|
||||
onError?.(new Error("Invalid signature"), `event ${event.id}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Mark seen AFTER verify (don't cache invalid IDs)
|
||||
seen.add(event.id);
|
||||
metrics.emit("memory.seen_tracker_size", seen.size());
|
||||
|
||||
// Decrypt the message
|
||||
let plaintext: string;
|
||||
try {
|
||||
plaintext = await decrypt(sk, event.pubkey, event.content);
|
||||
metrics.emit("decrypt.success");
|
||||
} catch (err) {
|
||||
metrics.emit("decrypt.failure");
|
||||
metrics.emit("event.rejected.decrypt_failed");
|
||||
onError?.(err as Error, `decrypt from ${event.pubkey}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create reply function (try relays by health score)
|
||||
const replyTo = async (text: string): Promise<void> => {
|
||||
await sendEncryptedDm(
|
||||
pool,
|
||||
sk,
|
||||
event.pubkey,
|
||||
text,
|
||||
relays,
|
||||
metrics,
|
||||
circuitBreakers,
|
||||
healthTracker,
|
||||
onError
|
||||
);
|
||||
};
|
||||
|
||||
// Call the message handler
|
||||
await onMessage(event.pubkey, plaintext, replyTo);
|
||||
|
||||
// Mark as processed
|
||||
metrics.emit("event.processed");
|
||||
|
||||
// Persist progress (debounced)
|
||||
scheduleStatePersist(event.created_at, event.id);
|
||||
} catch (err) {
|
||||
onError?.(err as Error, `event ${event.id}`);
|
||||
} finally {
|
||||
inflight.delete(event.id);
|
||||
}
|
||||
}
|
||||
|
||||
const sub = pool.subscribeMany(
|
||||
relays,
|
||||
[{ kinds: [4], "#p": [pk], since }],
|
||||
{
|
||||
onevent: handleEvent,
|
||||
oneose: () => {
|
||||
// EOSE handler - called when all stored events have been received
|
||||
for (const relay of relays) {
|
||||
metrics.emit("relay.message.eose", 1, { relay });
|
||||
}
|
||||
onEose?.(relays.join(", "));
|
||||
},
|
||||
onclose: (reason) => {
|
||||
// Handle subscription close
|
||||
for (const relay of relays) {
|
||||
metrics.emit("relay.message.closed", 1, { relay });
|
||||
options.onDisconnect?.(relay);
|
||||
}
|
||||
onError?.(
|
||||
new Error(`Subscription closed: ${reason}`),
|
||||
"subscription"
|
||||
);
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
// Public sendDm function
|
||||
const sendDm = async (toPubkey: string, text: string): Promise<void> => {
|
||||
await sendEncryptedDm(
|
||||
pool,
|
||||
sk,
|
||||
toPubkey,
|
||||
text,
|
||||
relays,
|
||||
metrics,
|
||||
circuitBreakers,
|
||||
healthTracker,
|
||||
onError
|
||||
);
|
||||
};
|
||||
|
||||
// Profile publishing function
|
||||
const publishProfile = async (profile: NostrProfile): Promise<ProfilePublishResult> => {
|
||||
// Read last published timestamp for monotonic ordering
|
||||
const profileState = await readNostrProfileState({ accountId });
|
||||
const lastPublishedAt = profileState?.lastPublishedAt ?? undefined;
|
||||
|
||||
// Publish the profile
|
||||
const result = await publishProfileFn(pool, sk, relays, profile, lastPublishedAt);
|
||||
|
||||
// Convert results to state format
|
||||
const publishResults: Record<string, "ok" | "failed" | "timeout"> = {};
|
||||
for (const relay of result.successes) {
|
||||
publishResults[relay] = "ok";
|
||||
}
|
||||
for (const { relay, error } of result.failures) {
|
||||
publishResults[relay] = error === "timeout" ? "timeout" : "failed";
|
||||
}
|
||||
|
||||
// Persist the publish state
|
||||
await writeNostrProfileState({
|
||||
accountId,
|
||||
lastPublishedAt: result.createdAt,
|
||||
lastPublishedEventId: result.eventId,
|
||||
lastPublishResults: publishResults,
|
||||
});
|
||||
|
||||
return result;
|
||||
};
|
||||
|
||||
// Get profile state function
|
||||
const getProfileState = async () => {
|
||||
const state = await readNostrProfileState({ accountId });
|
||||
return {
|
||||
lastPublishedAt: state?.lastPublishedAt ?? null,
|
||||
lastPublishedEventId: state?.lastPublishedEventId ?? null,
|
||||
lastPublishResults: state?.lastPublishResults ?? null,
|
||||
};
|
||||
};
|
||||
|
||||
return {
|
||||
close: () => {
|
||||
sub.close();
|
||||
seen.stop();
|
||||
// Flush pending state write synchronously on close
|
||||
if (pendingWrite) {
|
||||
clearTimeout(pendingWrite);
|
||||
writeNostrBusState({
|
||||
accountId,
|
||||
lastProcessedAt,
|
||||
gatewayStartedAt,
|
||||
recentEventIds,
|
||||
}).catch((err) => onError?.(err as Error, "persist state on close"));
|
||||
}
|
||||
},
|
||||
publicKey: pk,
|
||||
sendDm,
|
||||
getMetrics: () => metrics.getSnapshot(),
|
||||
publishProfile,
|
||||
getProfileState,
|
||||
};
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Send DM with Circuit Breaker + Health Scoring
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Send an encrypted DM to a pubkey
|
||||
*/
|
||||
async function sendEncryptedDm(
|
||||
pool: SimplePool,
|
||||
sk: Uint8Array,
|
||||
toPubkey: string,
|
||||
text: string,
|
||||
relays: string[],
|
||||
metrics: NostrMetrics,
|
||||
circuitBreakers: Map<string, CircuitBreaker>,
|
||||
healthTracker: RelayHealthTracker,
|
||||
onError?: (error: Error, context: string) => void
|
||||
): Promise<void> {
|
||||
const ciphertext = await encrypt(sk, toPubkey, text);
|
||||
const reply = finalizeEvent(
|
||||
{
|
||||
kind: 4,
|
||||
content: ciphertext,
|
||||
tags: [["p", toPubkey]],
|
||||
created_at: Math.floor(Date.now() / 1000),
|
||||
},
|
||||
sk
|
||||
);
|
||||
|
||||
// Sort relays by health score (best first)
|
||||
const sortedRelays = healthTracker.getSortedRelays(relays);
|
||||
|
||||
// Try relays in order of health, respecting circuit breakers
|
||||
let lastError: Error | undefined;
|
||||
for (const relay of sortedRelays) {
|
||||
const cb = circuitBreakers.get(relay);
|
||||
|
||||
// Skip if circuit breaker is open
|
||||
if (cb && !cb.canAttempt()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const startTime = Date.now();
|
||||
try {
|
||||
await pool.publish([relay], reply);
|
||||
const latency = Date.now() - startTime;
|
||||
|
||||
// Record success
|
||||
cb?.recordSuccess();
|
||||
healthTracker.recordSuccess(relay, latency);
|
||||
|
||||
return; // Success - exit early
|
||||
} catch (err) {
|
||||
lastError = err as Error;
|
||||
const latency = Date.now() - startTime;
|
||||
|
||||
// Record failure
|
||||
cb?.recordFailure();
|
||||
healthTracker.recordFailure(relay);
|
||||
metrics.emit("relay.error", 1, { relay, latency });
|
||||
|
||||
onError?.(lastError, `publish to ${relay}`);
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`Failed to publish to any relay: ${lastError?.message}`);
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Pubkey Utilities
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Check if a string looks like a valid Nostr pubkey (hex or npub)
|
||||
*/
|
||||
export function isValidPubkey(input: string): boolean {
|
||||
if (typeof input !== "string") return false;
|
||||
const trimmed = input.trim();
|
||||
|
||||
// npub format
|
||||
if (trimmed.startsWith("npub1")) {
|
||||
try {
|
||||
const decoded = nip19.decode(trimmed);
|
||||
return decoded.type === "npub";
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Hex format
|
||||
return /^[0-9a-fA-F]{64}$/.test(trimmed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalize a pubkey to hex format (accepts npub or hex)
|
||||
*/
|
||||
export function normalizePubkey(input: string): string {
|
||||
const trimmed = input.trim();
|
||||
|
||||
// npub format - decode to hex
|
||||
if (trimmed.startsWith("npub1")) {
|
||||
const decoded = nip19.decode(trimmed);
|
||||
if (decoded.type !== "npub") {
|
||||
throw new Error("Invalid npub key");
|
||||
}
|
||||
// Convert Uint8Array to hex string
|
||||
return Array.from(decoded.data)
|
||||
.map((b) => b.toString(16).padStart(2, "0"))
|
||||
.join("");
|
||||
}
|
||||
|
||||
// Already hex - validate and return lowercase
|
||||
if (!/^[0-9a-fA-F]{64}$/.test(trimmed)) {
|
||||
throw new Error("Pubkey must be 64 hex characters or npub format");
|
||||
}
|
||||
return trimmed.toLowerCase();
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a hex pubkey to npub format
|
||||
*/
|
||||
export function pubkeyToNpub(hexPubkey: string): string {
|
||||
const normalized = normalizePubkey(hexPubkey);
|
||||
// npubEncode expects a hex string, not Uint8Array
|
||||
return nip19.npubEncode(normalized);
|
||||
}
|
||||
Reference in New Issue
Block a user