fix: expose heartbeat controls and harden mac CLI
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -9,6 +9,7 @@ apps/macos/.build/
|
||||
bin/clawdis-mac
|
||||
apps/macos/.build-local/
|
||||
apps/macos/.swiftpm/
|
||||
Core/
|
||||
apps/ios/*.xcodeproj/
|
||||
apps/ios/*.xcworkspace/
|
||||
apps/ios/.swiftpm/
|
||||
|
||||
@@ -111,8 +111,8 @@ final class ControlChannel: ObservableObject {
|
||||
}
|
||||
|
||||
func lastHeartbeat() async throws -> ControlHeartbeatEvent? {
|
||||
// Heartbeat removed in new protocol
|
||||
nil
|
||||
let data = try await self.request(method: "last-heartbeat")
|
||||
return try JSONDecoder().decode(ControlHeartbeatEvent?.self, from: data)
|
||||
}
|
||||
|
||||
func request(
|
||||
@@ -219,6 +219,13 @@ final class ControlChannel: ObservableObject {
|
||||
AgentEventStore.shared.append(agent)
|
||||
self.routeWorkActivity(from: agent)
|
||||
}
|
||||
case let .event(evt) where evt.event == "heartbeat":
|
||||
if let payload = evt.payload,
|
||||
let heartbeat = try? GatewayPayloadDecoding.decode(payload, as: ControlHeartbeatEvent.self),
|
||||
let data = try? JSONEncoder().encode(heartbeat)
|
||||
{
|
||||
NotificationCenter.default.post(name: .controlHeartbeat, object: data)
|
||||
}
|
||||
case let .event(evt) where evt.event == "shutdown":
|
||||
self.state = .degraded("gateway shutdown")
|
||||
case .snapshot:
|
||||
|
||||
@@ -491,6 +491,7 @@ struct ClawdisCLI {
|
||||
shutdown(fd, SHUT_WR)
|
||||
|
||||
var data = Data()
|
||||
let decoder = JSONDecoder()
|
||||
var buffer = [UInt8](repeating: 0, count: 8192)
|
||||
let bufSize = buffer.count
|
||||
while true {
|
||||
@@ -501,14 +502,20 @@ struct ClawdisCLI {
|
||||
until: deadline,
|
||||
timeoutSeconds: timeoutSeconds)
|
||||
let n = buffer.withUnsafeMutableBytes { read(fd, $0.baseAddress!, bufSize) }
|
||||
if n > 0 { data.append(buffer, count: n); continue }
|
||||
if n > 0 {
|
||||
data.append(buffer, count: n)
|
||||
if let resp = try? decoder.decode(Response.self, from: data) {
|
||||
return resp
|
||||
}
|
||||
continue
|
||||
}
|
||||
if n == 0 { break }
|
||||
if n == -1, errno == EINTR { continue }
|
||||
if n == -1, errno == EAGAIN { continue }
|
||||
throw POSIXError(POSIXErrorCode(rawValue: errno) ?? .EIO)
|
||||
}
|
||||
guard !data.isEmpty else { throw POSIXError(.ECONNRESET) }
|
||||
return try JSONDecoder().decode(Response.self, from: data)
|
||||
return try decoder.decode(Response.self, from: data)
|
||||
}
|
||||
|
||||
private static func rpcTimeoutSeconds(for request: Request) -> TimeInterval {
|
||||
|
||||
@@ -39,7 +39,7 @@ import Testing
|
||||
#expect(cfg.token == "t")
|
||||
}
|
||||
|
||||
@Test func remoteWithoutTunnelIsUnavailable() async throws {
|
||||
@Test func remoteWithoutTunnelRecoversByEnsuringTunnel() async throws {
|
||||
let mode = ModeBox(.remote)
|
||||
let store = GatewayEndpointStore(deps: .init(
|
||||
mode: { mode.get() },
|
||||
@@ -48,12 +48,9 @@ import Testing
|
||||
remotePortIfRunning: { nil },
|
||||
ensureRemoteTunnel: { 18789 }))
|
||||
|
||||
do {
|
||||
_ = try await store.requireConfig()
|
||||
Issue.record("expected requireConfig to throw")
|
||||
} catch {
|
||||
#expect(error.localizedDescription.contains("no active control tunnel"))
|
||||
}
|
||||
let cfg = try await store.requireConfig()
|
||||
#expect(cfg.url.absoluteString == "ws://127.0.0.1:18789")
|
||||
#expect(cfg.token == nil)
|
||||
}
|
||||
|
||||
@Test func ensureRemoteTunnelPublishesReadyState() async throws {
|
||||
|
||||
@@ -82,6 +82,24 @@ This should be modeled after `WebChatManager`/`WebChatWindowController` but targ
|
||||
Related:
|
||||
- For “invoke the agent again from UI” flows, prefer the macOS deep link scheme (`clawdis://agent?...`) so *any* UI surface (Canvas, WebChat, native views) can trigger a new agent run. See `docs/clawdis-mac.md`.
|
||||
|
||||
## Triggering agent runs from Canvas (deep links)
|
||||
|
||||
Canvas can trigger new agent runs via the macOS app deep-link scheme:
|
||||
- `clawdis://agent?...`
|
||||
|
||||
This is intentionally separate from `clawdis-canvas://…` (which is only for serving local Canvas files into the `WKWebView`).
|
||||
|
||||
Suggested patterns:
|
||||
- HTML: render links/buttons that navigate to `clawdis://agent?message=...`.
|
||||
- JS: set `window.location.href = 'clawdis://agent?...'` for “run this now” actions.
|
||||
|
||||
Implementation note (important):
|
||||
- In `WKWebView`, intercept `clawdis://…` navigations in `WKNavigationDelegate` and forward them to the app, e.g. by calling `DeepLinkHandler.shared.handle(url:)` and returning `.cancel` for the navigation.
|
||||
|
||||
Safety:
|
||||
- `clawdis://agent` is disabled by default and must be enabled in **Clawdis → Settings → Debug** (“Allow URL scheme (agent)”).
|
||||
- Without a `key` query param, the app will prompt for confirmation before invoking the agent.
|
||||
|
||||
## Security / guardrails
|
||||
|
||||
Recommended defaults:
|
||||
|
||||
@@ -8,6 +8,7 @@ import { WebSocket } from "ws";
|
||||
import { agentCommand } from "../commands/agent.js";
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
import { GatewayLockError } from "../infra/gateway-lock.js";
|
||||
import { emitHeartbeatEvent } from "../infra/heartbeat-events.js";
|
||||
import { PROTOCOL_VERSION } from "./protocol/index.js";
|
||||
import { startGatewayServer } from "./server.js";
|
||||
|
||||
@@ -172,6 +173,86 @@ async function connectOk(
|
||||
}
|
||||
|
||||
describe("gateway server", () => {
|
||||
test("broadcasts heartbeat events and serves last-heartbeat", async () => {
|
||||
type HeartbeatPayload = {
|
||||
ts: number;
|
||||
status: string;
|
||||
to?: string;
|
||||
preview?: string;
|
||||
durationMs?: number;
|
||||
hasMedia?: boolean;
|
||||
reason?: string;
|
||||
};
|
||||
type EventFrame = {
|
||||
type: "event";
|
||||
event: string;
|
||||
payload?: HeartbeatPayload | null;
|
||||
};
|
||||
type ResFrame = {
|
||||
type: "res";
|
||||
id: string;
|
||||
ok: boolean;
|
||||
payload?: unknown;
|
||||
};
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: "hello",
|
||||
minProtocol: 1,
|
||||
maxProtocol: 1,
|
||||
client: { name: "test", version: "1", platform: "test", mode: "test" },
|
||||
caps: [],
|
||||
}),
|
||||
);
|
||||
await onceMessage(ws, (o) => o.type === "hello-ok");
|
||||
|
||||
const waitHeartbeat = onceMessage<EventFrame>(
|
||||
ws,
|
||||
(o) => o.type === "event" && o.event === "heartbeat",
|
||||
);
|
||||
emitHeartbeatEvent({ status: "sent", to: "+123", preview: "ping" });
|
||||
const evt = await waitHeartbeat;
|
||||
expect(evt.payload?.status).toBe("sent");
|
||||
expect(typeof evt.payload?.ts).toBe("number");
|
||||
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: "req",
|
||||
id: "hb-last",
|
||||
method: "last-heartbeat",
|
||||
}),
|
||||
);
|
||||
const last = await onceMessage<ResFrame>(
|
||||
ws,
|
||||
(o) => o.type === "res" && o.id === "hb-last",
|
||||
);
|
||||
expect(last.ok).toBe(true);
|
||||
const lastPayload = last.payload as HeartbeatPayload | null | undefined;
|
||||
expect(lastPayload?.status).toBe("sent");
|
||||
expect(lastPayload?.ts).toBe(evt.payload?.ts);
|
||||
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: "req",
|
||||
id: "hb-toggle-off",
|
||||
method: "set-heartbeats",
|
||||
params: { enabled: false },
|
||||
}),
|
||||
);
|
||||
const toggle = await onceMessage<ResFrame>(
|
||||
ws,
|
||||
(o) => o.type === "res" && o.id === "hb-toggle-off",
|
||||
);
|
||||
expect(toggle.ok).toBe(true);
|
||||
expect((toggle.payload as { enabled?: boolean } | undefined)?.enabled).toBe(
|
||||
false,
|
||||
);
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("agent falls back to allowFrom when lastTo is stale", async () => {
|
||||
testAllowFrom = ["+436769770569"];
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
|
||||
|
||||
@@ -22,6 +22,10 @@ import {
|
||||
import { isVerbose } from "../globals.js";
|
||||
import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { GatewayLockError } from "../infra/gateway-lock.js";
|
||||
import {
|
||||
getLastHeartbeatEvent,
|
||||
onHeartbeatEvent,
|
||||
} from "../infra/heartbeat-events.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import {
|
||||
listSystemPresence,
|
||||
@@ -35,6 +39,7 @@ import { defaultRuntime } from "../runtime.js";
|
||||
import { monitorTelegramProvider } from "../telegram/monitor.js";
|
||||
import { sendMessageTelegram } from "../telegram/send.js";
|
||||
import { normalizeE164 } from "../utils.js";
|
||||
import { setHeartbeatsEnabled } from "../web/auto-reply.js";
|
||||
import { sendMessageWhatsApp } from "../web/outbound.js";
|
||||
import { ensureWebChatServerFromConfig } from "../webchat/server.js";
|
||||
import { buildMessageWithAttachments } from "./chat-attachments.js";
|
||||
@@ -65,6 +70,8 @@ type Client = {
|
||||
const METHODS = [
|
||||
"health",
|
||||
"status",
|
||||
"last-heartbeat",
|
||||
"set-heartbeats",
|
||||
"system-presence",
|
||||
"system-event",
|
||||
"send",
|
||||
@@ -74,7 +81,15 @@ const METHODS = [
|
||||
"chat.send",
|
||||
];
|
||||
|
||||
const EVENTS = ["agent", "chat", "presence", "tick", "shutdown", "health"];
|
||||
const EVENTS = [
|
||||
"agent",
|
||||
"chat",
|
||||
"presence",
|
||||
"tick",
|
||||
"shutdown",
|
||||
"health",
|
||||
"heartbeat",
|
||||
];
|
||||
|
||||
export type GatewayServer = {
|
||||
close: () => Promise<void>;
|
||||
@@ -494,6 +509,10 @@ export async function startGatewayServer(
|
||||
}
|
||||
});
|
||||
|
||||
const heartbeatUnsub = onHeartbeatEvent((evt) => {
|
||||
broadcast("heartbeat", evt, { dropIfSlow: true });
|
||||
});
|
||||
|
||||
wss.on("connection", (socket) => {
|
||||
let client: Client | null = null;
|
||||
let closed = false;
|
||||
@@ -974,6 +993,28 @@ export async function startGatewayServer(
|
||||
respond(true, status, undefined);
|
||||
break;
|
||||
}
|
||||
case "last-heartbeat": {
|
||||
respond(true, getLastHeartbeatEvent(), undefined);
|
||||
break;
|
||||
}
|
||||
case "set-heartbeats": {
|
||||
const params = (req.params ?? {}) as Record<string, unknown>;
|
||||
const enabled = params.enabled;
|
||||
if (typeof enabled !== "boolean") {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
"invalid set-heartbeats params: enabled (boolean) required",
|
||||
),
|
||||
);
|
||||
break;
|
||||
}
|
||||
setHeartbeatsEnabled(enabled);
|
||||
respond(true, { ok: true, enabled }, undefined);
|
||||
break;
|
||||
}
|
||||
case "system-presence": {
|
||||
const presence = listSystemPresence();
|
||||
respond(true, presence, undefined);
|
||||
@@ -1399,6 +1440,13 @@ export async function startGatewayServer(
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
if (heartbeatUnsub) {
|
||||
try {
|
||||
heartbeatUnsub();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
chatRunSessions.clear();
|
||||
chatRunBuffers.clear();
|
||||
for (const c of clients) {
|
||||
|
||||
Reference in New Issue
Block a user