RPC: stream heartbeat events to menu
This commit is contained in:
@@ -4,6 +4,18 @@ import OSLog
|
|||||||
actor AgentRPC {
|
actor AgentRPC {
|
||||||
static let shared = AgentRPC()
|
static let shared = AgentRPC()
|
||||||
|
|
||||||
|
struct HeartbeatEvent: Codable {
|
||||||
|
let ts: Double
|
||||||
|
let status: String
|
||||||
|
let to: String?
|
||||||
|
let preview: String?
|
||||||
|
let durationMs: Double?
|
||||||
|
let hasMedia: Bool?
|
||||||
|
let reason: String?
|
||||||
|
}
|
||||||
|
|
||||||
|
static let heartbeatNotification = Notification.Name("clawdis.rpc.heartbeat")
|
||||||
|
|
||||||
private var process: Process?
|
private var process: Process?
|
||||||
private var stdinHandle: FileHandle?
|
private var stdinHandle: FileHandle?
|
||||||
private var stdoutHandle: FileHandle?
|
private var stdoutHandle: FileHandle?
|
||||||
@@ -175,6 +187,15 @@ actor AgentRPC {
|
|||||||
let lineData = self.buffer.subdata(in: self.buffer.startIndex..<range.lowerBound)
|
let lineData = self.buffer.subdata(in: self.buffer.startIndex..<range.lowerBound)
|
||||||
self.buffer.removeSubrange(self.buffer.startIndex...range.lowerBound)
|
self.buffer.removeSubrange(self.buffer.startIndex...range.lowerBound)
|
||||||
guard let line = String(data: lineData, encoding: .utf8) else { continue }
|
guard let line = String(data: lineData, encoding: .utf8) else { continue }
|
||||||
|
|
||||||
|
// Handle event envelopes (unsolicited)
|
||||||
|
if let event = self.parseHeartbeatEvent(from: line) {
|
||||||
|
DispatchQueue.main.async {
|
||||||
|
NotificationCenter.default.post(name: Self.heartbeatNotification, object: event)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if let waiter = waiters.first {
|
if let waiter = waiters.first {
|
||||||
self.waiters.removeFirst()
|
self.waiters.removeFirst()
|
||||||
waiter.resume(returning: line)
|
waiter.resume(returning: line)
|
||||||
@@ -182,6 +203,24 @@ actor AgentRPC {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func parseHeartbeatEvent(from line: String) -> HeartbeatEvent? {
|
||||||
|
guard let data = line.data(using: .utf8) else { return nil }
|
||||||
|
guard
|
||||||
|
let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
|
||||||
|
let type = obj["type"] as? String,
|
||||||
|
type == "event",
|
||||||
|
let evt = obj["event"] as? String,
|
||||||
|
evt == "heartbeat",
|
||||||
|
let payload = obj["payload"] as? [String: Any]
|
||||||
|
else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
let decoder = JSONDecoder()
|
||||||
|
guard let payloadData = try? JSONSerialization.data(withJSONObject: payload) else { return nil }
|
||||||
|
return try? decoder.decode(HeartbeatEvent.self, from: payloadData)
|
||||||
|
}
|
||||||
|
|
||||||
private func nextLine() async throws -> String {
|
private func nextLine() async throws -> String {
|
||||||
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<String, Error>) in
|
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<String, Error>) in
|
||||||
self.waiters.append(cont)
|
self.waiters.append(cont)
|
||||||
|
|||||||
@@ -56,6 +56,7 @@ private struct MenuContent: View {
|
|||||||
let updater: UpdaterProviding?
|
let updater: UpdaterProviding?
|
||||||
@ObservedObject private var relayManager = RelayProcessManager.shared
|
@ObservedObject private var relayManager = RelayProcessManager.shared
|
||||||
@ObservedObject private var healthStore = HealthStore.shared
|
@ObservedObject private var healthStore = HealthStore.shared
|
||||||
|
@ObservedObject private var heartbeatStore = HeartbeatStore.shared
|
||||||
@Environment(\.openSettings) private var openSettings
|
@Environment(\.openSettings) private var openSettings
|
||||||
@State private var availableMics: [AudioInputDevice] = []
|
@State private var availableMics: [AudioInputDevice] = []
|
||||||
@State private var loadingMics = false
|
@State private var loadingMics = false
|
||||||
@@ -68,6 +69,7 @@ private struct MenuContent: View {
|
|||||||
}
|
}
|
||||||
self.statusRow
|
self.statusRow
|
||||||
Toggle(isOn: self.heartbeatsBinding) { Text("Send Heartbeats") }
|
Toggle(isOn: self.heartbeatsBinding) { Text("Send Heartbeats") }
|
||||||
|
self.heartbeatStatusRow
|
||||||
Toggle(isOn: self.voiceWakeBinding) { Text("Voice Wake") }
|
Toggle(isOn: self.voiceWakeBinding) { Text("Voice Wake") }
|
||||||
.disabled(!voiceWakeSupported)
|
.disabled(!voiceWakeSupported)
|
||||||
.opacity(voiceWakeSupported ? 1 : 0.5)
|
.opacity(voiceWakeSupported ? 1 : 0.5)
|
||||||
@@ -169,6 +171,45 @@ private struct MenuContent: View {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private var heartbeatStatusRow: some View {
|
||||||
|
let label: String
|
||||||
|
let color: Color
|
||||||
|
|
||||||
|
if let evt = self.heartbeatStore.lastEvent {
|
||||||
|
let ageText = age(from: Date(timeIntervalSince1970: evt.ts / 1000))
|
||||||
|
switch evt.status {
|
||||||
|
case "sent":
|
||||||
|
label = "Last heartbeat sent · \(ageText)"
|
||||||
|
color = .blue
|
||||||
|
case "ok-empty", "ok-token":
|
||||||
|
label = "Heartbeat ok · \(ageText)"
|
||||||
|
color = .green
|
||||||
|
case "skipped":
|
||||||
|
label = "Heartbeat skipped · \(ageText)"
|
||||||
|
color = .secondary
|
||||||
|
case "failed":
|
||||||
|
label = "Heartbeat failed · \(ageText)"
|
||||||
|
color = .red
|
||||||
|
default:
|
||||||
|
label = "Heartbeat · \(ageText)"
|
||||||
|
color = .secondary
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
label = "No heartbeat yet"
|
||||||
|
color = .secondary
|
||||||
|
}
|
||||||
|
|
||||||
|
return HStack(spacing: 8) {
|
||||||
|
Circle()
|
||||||
|
.fill(color)
|
||||||
|
.frame(width: 8, height: 8)
|
||||||
|
Text(label)
|
||||||
|
.font(.caption.weight(.semibold))
|
||||||
|
.foregroundStyle(.primary)
|
||||||
|
}
|
||||||
|
.padding(.vertical, 2)
|
||||||
|
}
|
||||||
|
|
||||||
private var activeBinding: Binding<Bool> {
|
private var activeBinding: Binding<Bool> {
|
||||||
Binding(get: { !self.state.isPaused }, set: { self.state.isPaused = !$0 })
|
Binding(get: { !self.state.isPaused }, set: { self.state.isPaused = !$0 })
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,6 +23,10 @@ import {
|
|||||||
resolveHeartbeatSeconds,
|
resolveHeartbeatSeconds,
|
||||||
resolveReconnectPolicy,
|
resolveReconnectPolicy,
|
||||||
} from "../web/reconnect.js";
|
} from "../web/reconnect.js";
|
||||||
|
import {
|
||||||
|
readLatestHeartbeat,
|
||||||
|
tailHeartbeatEvents,
|
||||||
|
} from "../process/heartbeat-events.js";
|
||||||
import {
|
import {
|
||||||
ensureWebChatServerFromConfig,
|
ensureWebChatServerFromConfig,
|
||||||
startWebChatServer,
|
startWebChatServer,
|
||||||
@@ -241,6 +245,14 @@ Examples:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const forwardHeartbeat = (payload: unknown) => {
|
||||||
|
respond({ type: "event", event: "heartbeat", payload });
|
||||||
|
};
|
||||||
|
|
||||||
|
const latest = readLatestHeartbeat();
|
||||||
|
if (latest) forwardHeartbeat(latest);
|
||||||
|
const stopTail = tailHeartbeatEvents(forwardHeartbeat);
|
||||||
|
|
||||||
rl.on("line", async (line: string) => {
|
rl.on("line", async (line: string) => {
|
||||||
if (!line.trim()) return;
|
if (!line.trim()) return;
|
||||||
try {
|
try {
|
||||||
@@ -311,6 +323,8 @@ Examples:
|
|||||||
};
|
};
|
||||||
|
|
||||||
await new Promise(() => {});
|
await new Promise(() => {});
|
||||||
|
|
||||||
|
stopTail();
|
||||||
});
|
});
|
||||||
|
|
||||||
program
|
program
|
||||||
|
|||||||
78
src/process/heartbeat-events.ts
Normal file
78
src/process/heartbeat-events.ts
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
import fs from "node:fs";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
import readline from "node:readline";
|
||||||
|
|
||||||
|
export type HeartbeatEvent = {
|
||||||
|
type: "heartbeat";
|
||||||
|
ts: number; // epoch ms
|
||||||
|
status:
|
||||||
|
| "sent"
|
||||||
|
| "ok-empty"
|
||||||
|
| "ok-token"
|
||||||
|
| "skipped"
|
||||||
|
| "failed";
|
||||||
|
to?: string;
|
||||||
|
preview?: string;
|
||||||
|
durationMs?: number;
|
||||||
|
hasMedia?: boolean;
|
||||||
|
reason?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
const EVENT_FILENAME = "heartbeat-events.jsonl";
|
||||||
|
const STATE_FILENAME = "heartbeat-state.json";
|
||||||
|
|
||||||
|
function baseDir() {
|
||||||
|
const dir = path.join(os.homedir(), ".clawdis");
|
||||||
|
if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true });
|
||||||
|
return dir;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function heartbeatEventsPath() {
|
||||||
|
return path.join(baseDir(), EVENT_FILENAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function heartbeatStatePath() {
|
||||||
|
return path.join(baseDir(), STATE_FILENAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function writeHeartbeatEvent(evt: HeartbeatEvent) {
|
||||||
|
const line = JSON.stringify(evt);
|
||||||
|
fs.appendFileSync(heartbeatEventsPath(), `${line}\n`, { encoding: "utf8" });
|
||||||
|
fs.writeFileSync(heartbeatStatePath(), line, { encoding: "utf8" });
|
||||||
|
}
|
||||||
|
|
||||||
|
export function readLatestHeartbeat(): HeartbeatEvent | null {
|
||||||
|
try {
|
||||||
|
const txt = fs.readFileSync(heartbeatStatePath(), "utf8");
|
||||||
|
return JSON.parse(txt) as HeartbeatEvent;
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tail the events file and invoke the callback for every new parsed event.
|
||||||
|
export function tailHeartbeatEvents(onEvent: (evt: HeartbeatEvent) => void) {
|
||||||
|
const file = heartbeatEventsPath();
|
||||||
|
if (!fs.existsSync(file)) {
|
||||||
|
fs.writeFileSync(file, "", { encoding: "utf8" });
|
||||||
|
}
|
||||||
|
|
||||||
|
const stream = fs.createReadStream(file, { encoding: "utf8", flags: "a+" });
|
||||||
|
const rl = readline.createInterface({ input: stream });
|
||||||
|
rl.on("line", (line) => {
|
||||||
|
const trimmed = line.trim();
|
||||||
|
if (!trimmed) return;
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(trimmed) as HeartbeatEvent;
|
||||||
|
if (parsed?.type === "heartbeat") onEvent(parsed);
|
||||||
|
} catch {
|
||||||
|
// ignore malformed
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
rl.close();
|
||||||
|
stream.close();
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -14,6 +14,10 @@ import { danger, info, isVerbose, logVerbose, success } from "../globals.js";
|
|||||||
import { logInfo } from "../logger.js";
|
import { logInfo } from "../logger.js";
|
||||||
import { getChildLogger } from "../logging.js";
|
import { getChildLogger } from "../logging.js";
|
||||||
import { getQueueSize } from "../process/command-queue.js";
|
import { getQueueSize } from "../process/command-queue.js";
|
||||||
|
import {
|
||||||
|
type HeartbeatEvent,
|
||||||
|
writeHeartbeatEvent,
|
||||||
|
} from "../process/heartbeat-events.js";
|
||||||
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
||||||
import { jidToE164, normalizeE164 } from "../utils.js";
|
import { jidToE164, normalizeE164 } from "../utils.js";
|
||||||
import { monitorWebInbox } from "./inbound.js";
|
import { monitorWebInbox } from "./inbound.js";
|
||||||
@@ -78,6 +82,10 @@ const formatDuration = (ms: number) =>
|
|||||||
|
|
||||||
const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30;
|
const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30;
|
||||||
export const HEARTBEAT_TOKEN = "HEARTBEAT_OK";
|
export const HEARTBEAT_TOKEN = "HEARTBEAT_OK";
|
||||||
|
|
||||||
|
function emitHeartbeatEvent(evt: Omit<HeartbeatEvent, "type" | "ts">) {
|
||||||
|
writeHeartbeatEvent({ type: "heartbeat", ts: Date.now(), ...evt });
|
||||||
|
}
|
||||||
export const HEARTBEAT_PROMPT = "HEARTBEAT /think:high";
|
export const HEARTBEAT_PROMPT = "HEARTBEAT /think:high";
|
||||||
|
|
||||||
function elide(text?: string, limit = 400) {
|
function elide(text?: string, limit = 400) {
|
||||||
@@ -261,6 +269,12 @@ export async function runWebHeartbeatOnce(opts: {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const sendResult = await sender(to, overrideBody, { verbose });
|
const sendResult = await sender(to, overrideBody, { verbose });
|
||||||
|
emitHeartbeatEvent({
|
||||||
|
status: "sent",
|
||||||
|
to,
|
||||||
|
preview: overrideBody.slice(0, 160),
|
||||||
|
hasMedia: false,
|
||||||
|
});
|
||||||
heartbeatLogger.info(
|
heartbeatLogger.info(
|
||||||
{
|
{
|
||||||
to,
|
to,
|
||||||
@@ -307,6 +321,7 @@ export async function runWebHeartbeatOnce(opts: {
|
|||||||
"heartbeat skipped",
|
"heartbeat skipped",
|
||||||
);
|
);
|
||||||
if (verbose) console.log(success("heartbeat: ok (empty reply)"));
|
if (verbose) console.log(success("heartbeat: ok (empty reply)"));
|
||||||
|
emitHeartbeatEvent({ status: "ok-empty", to });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -328,6 +343,7 @@ export async function runWebHeartbeatOnce(opts: {
|
|||||||
"heartbeat skipped",
|
"heartbeat skipped",
|
||||||
);
|
);
|
||||||
console.log(success("heartbeat: ok (HEARTBEAT_OK)"));
|
console.log(success("heartbeat: ok (HEARTBEAT_OK)"));
|
||||||
|
emitHeartbeatEvent({ status: "ok-token", to });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -351,6 +367,12 @@ export async function runWebHeartbeatOnce(opts: {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const sendResult = await sender(to, finalText, { verbose });
|
const sendResult = await sender(to, finalText, { verbose });
|
||||||
|
emitHeartbeatEvent({
|
||||||
|
status: "sent",
|
||||||
|
to,
|
||||||
|
preview: finalText.slice(0, 160),
|
||||||
|
hasMedia,
|
||||||
|
});
|
||||||
heartbeatLogger.info(
|
heartbeatLogger.info(
|
||||||
{
|
{
|
||||||
to,
|
to,
|
||||||
@@ -364,6 +386,7 @@ export async function runWebHeartbeatOnce(opts: {
|
|||||||
} catch (err) {
|
} catch (err) {
|
||||||
heartbeatLogger.warn({ to, error: String(err) }, "heartbeat failed");
|
heartbeatLogger.warn({ to, error: String(err) }, "heartbeat failed");
|
||||||
console.log(danger(`heartbeat: failed - ${String(err)}`));
|
console.log(danger(`heartbeat: failed - ${String(err)}`));
|
||||||
|
emitHeartbeatEvent({ status: "failed", to, reason: String(err) });
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user