diff --git a/apps/macos/Sources/Clawdis/AgentRPC.swift b/apps/macos/Sources/Clawdis/AgentRPC.swift index 977bf40e2..a7ea517f4 100644 --- a/apps/macos/Sources/Clawdis/AgentRPC.swift +++ b/apps/macos/Sources/Clawdis/AgentRPC.swift @@ -4,6 +4,18 @@ import OSLog actor AgentRPC { static let shared = AgentRPC() + struct HeartbeatEvent: Codable { + let ts: Double + let status: String + let to: String? + let preview: String? + let durationMs: Double? + let hasMedia: Bool? + let reason: String? + } + + static let heartbeatNotification = Notification.Name("clawdis.rpc.heartbeat") + private var process: Process? private var stdinHandle: FileHandle? private var stdoutHandle: FileHandle? @@ -175,6 +187,15 @@ actor AgentRPC { let lineData = self.buffer.subdata(in: self.buffer.startIndex.. HeartbeatEvent? { + guard let data = line.data(using: .utf8) else { return nil } + guard + let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + let type = obj["type"] as? String, + type == "event", + let evt = obj["event"] as? String, + evt == "heartbeat", + let payload = obj["payload"] as? [String: Any] + else { + return nil + } + + let decoder = JSONDecoder() + guard let payloadData = try? JSONSerialization.data(withJSONObject: payload) else { return nil } + return try? decoder.decode(HeartbeatEvent.self, from: payloadData) + } + private func nextLine() async throws -> String { try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in self.waiters.append(cont) diff --git a/apps/macos/Sources/Clawdis/MenuBar.swift b/apps/macos/Sources/Clawdis/MenuBar.swift index 14d7d119c..e3f2e7a69 100644 --- a/apps/macos/Sources/Clawdis/MenuBar.swift +++ b/apps/macos/Sources/Clawdis/MenuBar.swift @@ -56,6 +56,7 @@ private struct MenuContent: View { let updater: UpdaterProviding? @ObservedObject private var relayManager = RelayProcessManager.shared @ObservedObject private var healthStore = HealthStore.shared + @ObservedObject private var heartbeatStore = HeartbeatStore.shared @Environment(\.openSettings) private var openSettings @State private var availableMics: [AudioInputDevice] = [] @State private var loadingMics = false @@ -68,6 +69,7 @@ private struct MenuContent: View { } self.statusRow Toggle(isOn: self.heartbeatsBinding) { Text("Send Heartbeats") } + self.heartbeatStatusRow Toggle(isOn: self.voiceWakeBinding) { Text("Voice Wake") } .disabled(!voiceWakeSupported) .opacity(voiceWakeSupported ? 1 : 0.5) @@ -169,6 +171,45 @@ private struct MenuContent: View { } } + private var heartbeatStatusRow: some View { + let label: String + let color: Color + + if let evt = self.heartbeatStore.lastEvent { + let ageText = age(from: Date(timeIntervalSince1970: evt.ts / 1000)) + switch evt.status { + case "sent": + label = "Last heartbeat sent · \(ageText)" + color = .blue + case "ok-empty", "ok-token": + label = "Heartbeat ok · \(ageText)" + color = .green + case "skipped": + label = "Heartbeat skipped · \(ageText)" + color = .secondary + case "failed": + label = "Heartbeat failed · \(ageText)" + color = .red + default: + label = "Heartbeat · \(ageText)" + color = .secondary + } + } else { + label = "No heartbeat yet" + color = .secondary + } + + return HStack(spacing: 8) { + Circle() + .fill(color) + .frame(width: 8, height: 8) + Text(label) + .font(.caption.weight(.semibold)) + .foregroundStyle(.primary) + } + .padding(.vertical, 2) + } + private var activeBinding: Binding { Binding(get: { !self.state.isPaused }, set: { self.state.isPaused = !$0 }) } diff --git a/src/cli/program.ts b/src/cli/program.ts index 05bf812ca..4ee65fc17 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -23,6 +23,10 @@ import { resolveHeartbeatSeconds, resolveReconnectPolicy, } from "../web/reconnect.js"; +import { + readLatestHeartbeat, + tailHeartbeatEvents, +} from "../process/heartbeat-events.js"; import { ensureWebChatServerFromConfig, startWebChatServer, @@ -241,6 +245,14 @@ Examples: } }; + const forwardHeartbeat = (payload: unknown) => { + respond({ type: "event", event: "heartbeat", payload }); + }; + + const latest = readLatestHeartbeat(); + if (latest) forwardHeartbeat(latest); + const stopTail = tailHeartbeatEvents(forwardHeartbeat); + rl.on("line", async (line: string) => { if (!line.trim()) return; try { @@ -311,6 +323,8 @@ Examples: }; await new Promise(() => {}); + + stopTail(); }); program diff --git a/src/process/heartbeat-events.ts b/src/process/heartbeat-events.ts new file mode 100644 index 000000000..bdf3083fb --- /dev/null +++ b/src/process/heartbeat-events.ts @@ -0,0 +1,78 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import readline from "node:readline"; + +export type HeartbeatEvent = { + type: "heartbeat"; + ts: number; // epoch ms + status: + | "sent" + | "ok-empty" + | "ok-token" + | "skipped" + | "failed"; + to?: string; + preview?: string; + durationMs?: number; + hasMedia?: boolean; + reason?: string; +}; + +const EVENT_FILENAME = "heartbeat-events.jsonl"; +const STATE_FILENAME = "heartbeat-state.json"; + +function baseDir() { + const dir = path.join(os.homedir(), ".clawdis"); + if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true }); + return dir; +} + +export function heartbeatEventsPath() { + return path.join(baseDir(), EVENT_FILENAME); +} + +export function heartbeatStatePath() { + return path.join(baseDir(), STATE_FILENAME); +} + +export function writeHeartbeatEvent(evt: HeartbeatEvent) { + const line = JSON.stringify(evt); + fs.appendFileSync(heartbeatEventsPath(), `${line}\n`, { encoding: "utf8" }); + fs.writeFileSync(heartbeatStatePath(), line, { encoding: "utf8" }); +} + +export function readLatestHeartbeat(): HeartbeatEvent | null { + try { + const txt = fs.readFileSync(heartbeatStatePath(), "utf8"); + return JSON.parse(txt) as HeartbeatEvent; + } catch { + return null; + } +} + +// Tail the events file and invoke the callback for every new parsed event. +export function tailHeartbeatEvents(onEvent: (evt: HeartbeatEvent) => void) { + const file = heartbeatEventsPath(); + if (!fs.existsSync(file)) { + fs.writeFileSync(file, "", { encoding: "utf8" }); + } + + const stream = fs.createReadStream(file, { encoding: "utf8", flags: "a+" }); + const rl = readline.createInterface({ input: stream }); + rl.on("line", (line) => { + const trimmed = line.trim(); + if (!trimmed) return; + try { + const parsed = JSON.parse(trimmed) as HeartbeatEvent; + if (parsed?.type === "heartbeat") onEvent(parsed); + } catch { + // ignore malformed + } + }); + + return () => { + rl.close(); + stream.close(); + }; +} diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 8512df257..031be5740 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -14,6 +14,10 @@ import { danger, info, isVerbose, logVerbose, success } from "../globals.js"; import { logInfo } from "../logger.js"; import { getChildLogger } from "../logging.js"; import { getQueueSize } from "../process/command-queue.js"; +import { + type HeartbeatEvent, + writeHeartbeatEvent, +} from "../process/heartbeat-events.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { jidToE164, normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; @@ -78,6 +82,10 @@ const formatDuration = (ms: number) => const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30; export const HEARTBEAT_TOKEN = "HEARTBEAT_OK"; + +function emitHeartbeatEvent(evt: Omit) { + writeHeartbeatEvent({ type: "heartbeat", ts: Date.now(), ...evt }); +} export const HEARTBEAT_PROMPT = "HEARTBEAT /think:high"; function elide(text?: string, limit = 400) { @@ -261,6 +269,12 @@ export async function runWebHeartbeatOnce(opts: { return; } const sendResult = await sender(to, overrideBody, { verbose }); + emitHeartbeatEvent({ + status: "sent", + to, + preview: overrideBody.slice(0, 160), + hasMedia: false, + }); heartbeatLogger.info( { to, @@ -307,6 +321,7 @@ export async function runWebHeartbeatOnce(opts: { "heartbeat skipped", ); if (verbose) console.log(success("heartbeat: ok (empty reply)")); + emitHeartbeatEvent({ status: "ok-empty", to }); return; } @@ -328,6 +343,7 @@ export async function runWebHeartbeatOnce(opts: { "heartbeat skipped", ); console.log(success("heartbeat: ok (HEARTBEAT_OK)")); + emitHeartbeatEvent({ status: "ok-token", to }); return; } @@ -351,6 +367,12 @@ export async function runWebHeartbeatOnce(opts: { } const sendResult = await sender(to, finalText, { verbose }); + emitHeartbeatEvent({ + status: "sent", + to, + preview: finalText.slice(0, 160), + hasMedia, + }); heartbeatLogger.info( { to, @@ -364,6 +386,7 @@ export async function runWebHeartbeatOnce(opts: { } catch (err) { heartbeatLogger.warn({ to, error: String(err) }, "heartbeat failed"); console.log(danger(`heartbeat: failed - ${String(err)}`)); + emitHeartbeatEvent({ status: "failed", to, reason: String(err) }); throw err; } }