feat: unify gateway heartbeat

This commit is contained in:
Peter Steinberger
2025-12-26 02:35:21 +01:00
parent 8f9d7405ed
commit 0d8e0ddc4f
19 changed files with 744 additions and 953 deletions

View File

@@ -15,19 +15,11 @@ import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ClawdisConfig } from "../config/config.js";
import { resetLogger, setLoggerOverride } from "../logging.js";
import * as commandQueue from "../process/command-queue.js";
import {
HEARTBEAT_PROMPT,
HEARTBEAT_TOKEN,
monitorWebProvider,
resolveHeartbeatRecipients,
resolveReplyHeartbeatIntervalMs,
runWebHeartbeatOnce,
SILENT_REPLY_TOKEN,
stripHeartbeatToken,
} from "./auto-reply.js";
import type { sendMessageWhatsApp } from "./outbound.js";
import { requestReplyHeartbeatNow } from "./reply-heartbeat-wake.js";
import {
resetBaileysMocks,
resetLoadConfigMock,
@@ -107,146 +99,6 @@ const makeSessionStore = async (
};
};
describe("heartbeat helpers", () => {
it("strips heartbeat token and skips when only token", () => {
expect(stripHeartbeatToken(undefined)).toEqual({
shouldSkip: true,
text: "",
});
expect(stripHeartbeatToken(" ")).toEqual({
shouldSkip: true,
text: "",
});
expect(stripHeartbeatToken(HEARTBEAT_TOKEN)).toEqual({
shouldSkip: true,
text: "",
});
});
it("keeps content and removes token when mixed", () => {
expect(stripHeartbeatToken(`ALERT ${HEARTBEAT_TOKEN}`)).toEqual({
shouldSkip: false,
text: "ALERT",
});
expect(stripHeartbeatToken(`hello`)).toEqual({
shouldSkip: false,
text: "hello",
});
});
it("strips repeated OK tails after heartbeat token", () => {
expect(stripHeartbeatToken("HEARTBEAT_OK_OK_OK")).toEqual({
shouldSkip: true,
text: "",
});
expect(stripHeartbeatToken("HEARTBEAT_OK_OK")).toEqual({
shouldSkip: true,
text: "",
});
expect(stripHeartbeatToken("HEARTBEAT_OK _OK")).toEqual({
shouldSkip: true,
text: "",
});
expect(stripHeartbeatToken("HEARTBEAT_OK OK")).toEqual({
shouldSkip: true,
text: "",
});
expect(stripHeartbeatToken("ALERT HEARTBEAT_OK_OK")).toEqual({
shouldSkip: false,
text: "ALERT",
});
});
it("resolves reply heartbeat interval from config and overrides", () => {
const cfgBase: ClawdisConfig = {};
expect(resolveReplyHeartbeatIntervalMs(cfgBase)).toBeNull();
expect(
resolveReplyHeartbeatIntervalMs({
agent: { heartbeat: { every: "5m" } },
}),
).toBe(5 * 60_000);
expect(
resolveReplyHeartbeatIntervalMs({
agent: { heartbeat: { every: "0m" } },
}),
).toBeNull();
expect(resolveReplyHeartbeatIntervalMs(cfgBase, "7m")).toBe(7 * 60_000);
expect(
resolveReplyHeartbeatIntervalMs({
agent: { heartbeat: { every: "5" } },
}),
).toBe(5 * 60_000);
});
});
describe("resolveHeartbeatRecipients", () => {
it("returns the sole session recipient", async () => {
const now = Date.now();
const store = await makeSessionStore({
main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" },
});
const cfg: ClawdisConfig = {
routing: {
allowFrom: ["+1999"],
},
session: { store: store.storePath },
};
const result = resolveHeartbeatRecipients(cfg);
expect(result.source).toBe("session-single");
expect(result.recipients).toEqual(["+1000"]);
await store.cleanup();
});
it("surfaces ambiguity when multiple sessions exist", async () => {
const now = Date.now();
const store = await makeSessionStore({
main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" },
alt: { updatedAt: now - 10, lastChannel: "whatsapp", lastTo: "+2000" },
});
const cfg: ClawdisConfig = {
routing: {
allowFrom: ["+1999"],
},
session: { store: store.storePath },
};
const result = resolveHeartbeatRecipients(cfg);
expect(result.source).toBe("session-ambiguous");
expect(result.recipients).toEqual(["+1000", "+2000"]);
await store.cleanup();
});
it("filters wildcard allowFrom when no sessions exist", async () => {
const store = await makeSessionStore({});
const cfg: ClawdisConfig = {
routing: {
allowFrom: ["*"],
},
session: { store: store.storePath },
};
const result = resolveHeartbeatRecipients(cfg);
expect(result.recipients).toHaveLength(0);
expect(result.source).toBe("allowFrom");
await store.cleanup();
});
it("merges sessions and allowFrom when --all is set", async () => {
const now = Date.now();
const store = await makeSessionStore({
main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" },
});
const cfg: ClawdisConfig = {
routing: {
allowFrom: ["+1999"],
},
session: { store: store.storePath },
};
const result = resolveHeartbeatRecipients(cfg, { all: true });
expect(result.source).toBe("all");
expect(result.recipients.sort()).toEqual(["+1000", "+1999"].sort());
await store.cleanup();
});
});
describe("partial reply gating", () => {
it("does not send partial replies for WhatsApp surface", async () => {
const reply = vi.fn().mockResolvedValue(undefined);
@@ -387,249 +239,6 @@ describe("partial reply gating", () => {
});
});
describe("runWebHeartbeatOnce", () => {
it("skips when heartbeat token returned", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWhatsApp = vi.fn();
const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN }));
await runWebHeartbeatOnce({
cfg: {
routing: {
allowFrom: ["+1555"],
},
session: { store: store.storePath },
},
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
});
expect(resolver).toHaveBeenCalled();
expect(sender).not.toHaveBeenCalled();
await store.cleanup();
});
it("sends when alert text present", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWhatsApp = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn(async () => ({ text: "ALERT" }));
await runWebHeartbeatOnce({
cfg: {
routing: {
allowFrom: ["+1555"],
},
session: { store: store.storePath },
},
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
});
expect(sender).toHaveBeenCalledWith("+1555", "ALERT", { verbose: false });
await store.cleanup();
});
it("falls back to most recent session when no to is provided", async () => {
const store = await makeSessionStore();
const storePath = store.storePath;
const sender: typeof sendMessageWhatsApp = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn(async () => ({ text: "ALERT" }));
const now = Date.now();
const sessionEntries = {
"+1222": { sessionId: "s1", updatedAt: now - 1000 },
"+1333": { sessionId: "s2", updatedAt: now },
};
await fs.writeFile(storePath, JSON.stringify(sessionEntries));
await runWebHeartbeatOnce({
cfg: {
routing: {
allowFrom: ["+1999"],
},
session: { store: storePath },
},
to: "+1999",
verbose: false,
sender,
replyResolver: resolver,
});
expect(sender).toHaveBeenCalledWith("+1999", "ALERT", { verbose: false });
await store.cleanup();
});
it("does not refresh updatedAt when heartbeat is skipped", async () => {
const tmpDir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-heartbeat-"),
);
const storePath = path.join(tmpDir, "sessions.json");
const now = Date.now();
const originalUpdated = now - 30 * 60 * 1000;
const store = {
"+1555": { sessionId: "sess1", updatedAt: originalUpdated },
};
await fs.writeFile(storePath, JSON.stringify(store));
const sender: typeof sendMessageWhatsApp = vi.fn();
const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN }));
setLoadConfigMock({
routing: {
allowFrom: ["+1555"],
},
session: {
store: storePath,
idleMinutes: 60,
heartbeatIdleMinutes: 10,
},
});
await runWebHeartbeatOnce({
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
});
const after = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(after["+1555"].updatedAt).toBe(originalUpdated);
expect(sender).not.toHaveBeenCalled();
});
it("heartbeat reuses existing session id when last inbound is present", async () => {
const tmpDir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-heartbeat-session-"),
);
const storePath = path.join(tmpDir, "sessions.json");
const sessionId = "sess-keep";
await fs.writeFile(
storePath,
JSON.stringify({
main: { sessionId, updatedAt: Date.now(), systemSent: false },
}),
);
setLoadConfigMock(() => ({
routing: {
allowFrom: ["+4367"],
},
session: { store: storePath, idleMinutes: 60 },
}));
const replyResolver = vi.fn().mockResolvedValue({ text: HEARTBEAT_TOKEN });
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never;
const cfg: ClawdisConfig = {
routing: {
allowFrom: ["+4367"],
},
session: { store: storePath, idleMinutes: 60 },
};
await runWebHeartbeatOnce({
cfg,
to: "+4367",
verbose: false,
replyResolver,
runtime,
});
const heartbeatCall = replyResolver.mock.calls.find(
(call) => call[0]?.Body === HEARTBEAT_PROMPT,
);
expect(heartbeatCall?.[0]?.MessageSid).toBe(sessionId);
});
it("heartbeat honors session-id override and seeds store", async () => {
const tmpDir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-heartbeat-override-"),
);
const storePath = path.join(tmpDir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify({}));
const sessionId = "override-123";
setLoadConfigMock(() => ({
routing: {
allowFrom: ["+1999"],
},
session: { store: storePath, idleMinutes: 60 },
}));
const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN }));
const cfg: ClawdisConfig = {
routing: {
allowFrom: ["+1999"],
},
session: { store: storePath, idleMinutes: 60 },
};
await runWebHeartbeatOnce({
cfg,
to: "+1999",
verbose: false,
replyResolver: resolver,
sessionId,
});
const heartbeatCall = resolver.mock.calls.find(
(call) => call[0]?.Body === HEARTBEAT_PROMPT,
);
expect(heartbeatCall?.[0]?.MessageSid).toBe(sessionId);
const raw = await fs.readFile(storePath, "utf-8");
const stored = raw ? JSON.parse(raw) : {};
expect(stored.main?.sessionId).toBe(sessionId);
expect(stored.main?.updatedAt).toBeDefined();
});
it("sends overrideBody directly and skips resolver", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWhatsApp = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn();
await runWebHeartbeatOnce({
cfg: {
routing: {
allowFrom: ["+1555"],
},
session: { store: store.storePath },
},
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
overrideBody: "manual ping",
});
expect(sender).toHaveBeenCalledWith("+1555", "manual ping", {
verbose: false,
});
expect(resolver).not.toHaveBeenCalled();
await store.cleanup();
});
it("dry-run overrideBody prints and skips send", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWhatsApp = vi.fn();
const resolver = vi.fn();
await runWebHeartbeatOnce({
cfg: {
routing: {
allowFrom: ["+1555"],
},
session: { store: store.storePath },
},
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
overrideBody: "dry",
dryRun: true,
});
expect(sender).not.toHaveBeenCalled();
expect(resolver).not.toHaveBeenCalled();
await store.cleanup();
});
});
describe("web auto-reply", () => {
beforeEach(() => {
vi.clearAllMocks();
@@ -746,153 +355,6 @@ describe("web auto-reply", () => {
},
);
it("skips reply heartbeat when requests are running", async () => {
const tmpDir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-heartbeat-queue-"),
);
const storePath = path.join(tmpDir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify({}));
const queueSpy = vi.spyOn(commandQueue, "getQueueSize").mockReturnValue(2);
const replyResolver = vi.fn();
const listenerFactory = vi.fn(async () => {
const onClose = new Promise<void>(() => {
// stay open until aborted
});
return { close: vi.fn(), onClose };
});
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never;
setLoadConfigMock(() => ({
routing: {
allowFrom: ["+1555"],
},
session: { store: storePath },
}));
const controller = new AbortController();
const run = monitorWebProvider(
false,
listenerFactory,
true,
replyResolver,
runtime,
controller.signal,
{ replyHeartbeatEvery: "1m", replyHeartbeatNow: true },
);
try {
await Promise.resolve();
controller.abort();
await run;
expect(replyResolver).not.toHaveBeenCalled();
} finally {
queueSpy.mockRestore();
}
});
it("falls back to main recipient when last inbound is a group chat", async () => {
const now = Date.now();
const store = await makeSessionStore({
main: {
sessionId: "sid-main",
updatedAt: now,
lastChannel: "whatsapp",
lastTo: "+1555",
},
});
const replyResolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN }));
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = vi.fn(
async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
const onClose = new Promise<void>(() => {
// stay open until aborted
});
return { close: vi.fn(), onClose };
},
);
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never;
setLoadConfigMock(() => ({
routing: {
allowFrom: ["+1555"],
groupChat: { requireMention: true, mentionPatterns: ["@clawd"] },
},
session: { store: store.storePath },
}));
const controller = new AbortController();
const run = monitorWebProvider(
false,
listenerFactory,
true,
replyResolver,
runtime,
controller.signal,
{ replyHeartbeatEvery: "10000m" },
);
try {
await Promise.resolve();
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "hello group",
from: "123@g.us",
to: "+1555",
id: "g1",
sendComposing: vi.fn(),
reply: vi.fn(),
sendMedia: vi.fn(),
chatType: "group",
conversationId: "123@g.us",
chatId: "123@g.us",
});
// No mention => no auto-reply for the group message.
await new Promise((resolve) => setTimeout(resolve, 10));
expect(
replyResolver.mock.calls.some(
(call) => call[0]?.Body !== HEARTBEAT_PROMPT,
),
).toBe(false);
requestReplyHeartbeatNow({ coalesceMs: 0 });
let heartbeatCall = replyResolver.mock.calls.find(
(call) =>
call[0]?.Body === HEARTBEAT_PROMPT &&
call[0]?.MessageSid === "sid-main",
);
const deadline = Date.now() + 1000;
while (!heartbeatCall && Date.now() < deadline) {
await new Promise((resolve) => setTimeout(resolve, 10));
heartbeatCall = replyResolver.mock.calls.find(
(call) =>
call[0]?.Body === HEARTBEAT_PROMPT &&
call[0]?.MessageSid === "sid-main",
);
}
controller.abort();
await run;
expect(heartbeatCall).toBeDefined();
expect(heartbeatCall?.[0]?.From).toBe("+1555");
expect(heartbeatCall?.[0]?.To).toBe("+1555");
expect(heartbeatCall?.[0]?.MessageSid).toBe("sid-main");
} finally {
controller.abort();
await store.cleanup();
}
});
it("processes inbound messages without batching and preserves timestamps", async () => {
const originalTz = process.env.TZ;
process.env.TZ = "Europe/Vienna";