Heartbeat: harden targeting and support lid mapping
This commit is contained in:
@@ -9,8 +9,10 @@ import type { WarelayConfig } from "../config/config.js";
|
||||
import { resolveStorePath } from "../config/sessions.js";
|
||||
import { resetLogger, setLoggerOverride } from "../logging.js";
|
||||
import {
|
||||
HEARTBEAT_PROMPT,
|
||||
HEARTBEAT_TOKEN,
|
||||
monitorWebProvider,
|
||||
resolveHeartbeatRecipients,
|
||||
resolveReplyHeartbeatMinutes,
|
||||
runWebHeartbeatOnce,
|
||||
stripHeartbeatToken,
|
||||
@@ -75,6 +77,80 @@ describe("heartbeat helpers", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveHeartbeatRecipients", () => {
|
||||
const makeStore = async (entries: Record<string, { updatedAt: number }>) => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-heartbeat-"));
|
||||
const storePath = path.join(dir, "sessions.json");
|
||||
await fs.writeFile(storePath, JSON.stringify(entries));
|
||||
return {
|
||||
storePath,
|
||||
cleanup: async () => fs.rm(dir, { recursive: true, force: true }),
|
||||
};
|
||||
};
|
||||
|
||||
it("returns the sole session recipient", async () => {
|
||||
const now = Date.now();
|
||||
const store = await makeStore({ "+1000": { updatedAt: now } });
|
||||
const cfg: WarelayConfig = {
|
||||
inbound: {
|
||||
allowFrom: ["+1999"],
|
||||
reply: { mode: "command", session: { store: store.storePath } },
|
||||
},
|
||||
};
|
||||
const result = resolveHeartbeatRecipients(cfg);
|
||||
expect(result.source).toBe("session-single");
|
||||
expect(result.recipients).toEqual(["+1000"]);
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("surfaces ambiguity when multiple sessions exist", async () => {
|
||||
const now = Date.now();
|
||||
const store = await makeStore({
|
||||
"+1000": { updatedAt: now },
|
||||
"+2000": { updatedAt: now - 10 },
|
||||
});
|
||||
const cfg: WarelayConfig = {
|
||||
inbound: {
|
||||
allowFrom: ["+1999"],
|
||||
reply: { mode: "command", session: { store: store.storePath } },
|
||||
},
|
||||
};
|
||||
const result = resolveHeartbeatRecipients(cfg);
|
||||
expect(result.source).toBe("session-ambiguous");
|
||||
expect(result.recipients).toEqual(["+1000", "+2000"]);
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("filters wildcard allowFrom when no sessions exist", async () => {
|
||||
const store = await makeStore({});
|
||||
const cfg: WarelayConfig = {
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
reply: { mode: "command", session: { store: store.storePath } },
|
||||
},
|
||||
};
|
||||
const result = resolveHeartbeatRecipients(cfg);
|
||||
expect(result.recipients).toHaveLength(0);
|
||||
expect(result.source).toBe("allowFrom");
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("merges sessions and allowFrom when --all is set", async () => {
|
||||
const now = Date.now();
|
||||
const store = await makeStore({ "+1000": { updatedAt: now } });
|
||||
const cfg: WarelayConfig = {
|
||||
inbound: {
|
||||
allowFrom: ["+1999"],
|
||||
reply: { mode: "command", session: { store: store.storePath } },
|
||||
},
|
||||
};
|
||||
const result = resolveHeartbeatRecipients(cfg, { all: true });
|
||||
expect(result.source).toBe("all");
|
||||
expect(result.recipients.sort()).toEqual(["+1000", "+1999"].sort());
|
||||
await store.cleanup();
|
||||
});
|
||||
});
|
||||
|
||||
describe("runWebHeartbeatOnce", () => {
|
||||
it("skips when heartbeat token returned", async () => {
|
||||
const sender: typeof sendMessageWeb = vi.fn();
|
||||
@@ -179,6 +255,56 @@ describe("runWebHeartbeatOnce", () => {
|
||||
expect(after["+1555"].updatedAt).toBe(originalUpdated);
|
||||
expect(sender).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("heartbeat reuses existing session id when last inbound is present", async () => {
|
||||
const tmpDir = await fs.mkdtemp(
|
||||
path.join(os.tmpdir(), "warelay-heartbeat-session-"),
|
||||
);
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const sessionId = "sess-keep";
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
"+4367": { sessionId, updatedAt: Date.now(), systemSent: false },
|
||||
}),
|
||||
);
|
||||
|
||||
setLoadConfigMock(() => ({
|
||||
inbound: {
|
||||
allowFrom: ["+4367"],
|
||||
reply: {
|
||||
mode: "command",
|
||||
heartbeatMinutes: 0.001,
|
||||
session: { store: storePath, idleMinutes: 60 },
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
const replyResolver = vi.fn().mockResolvedValue({ text: HEARTBEAT_TOKEN });
|
||||
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never;
|
||||
const cfg: WarelayConfig = {
|
||||
inbound: {
|
||||
allowFrom: ["+4367"],
|
||||
reply: {
|
||||
mode: "command",
|
||||
session: { store: storePath, idleMinutes: 60 },
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
await runWebHeartbeatOnce({
|
||||
cfg,
|
||||
to: "+4367",
|
||||
verbose: false,
|
||||
replyResolver,
|
||||
runtime,
|
||||
});
|
||||
|
||||
const heartbeatCall = replyResolver.mock.calls.find(
|
||||
(call) => call[0]?.Body === HEARTBEAT_PROMPT,
|
||||
);
|
||||
expect(heartbeatCall?.[0]?.MessageSid).toBe(sessionId);
|
||||
});
|
||||
});
|
||||
|
||||
describe("web auto-reply", () => {
|
||||
|
||||
@@ -75,13 +75,14 @@ export function stripHeartbeatToken(raw?: string) {
|
||||
}
|
||||
|
||||
export async function runWebHeartbeatOnce(opts: {
|
||||
cfg?: ReturnType<typeof loadConfig>;
|
||||
to: string;
|
||||
verbose?: boolean;
|
||||
replyResolver?: typeof getReplyFromConfig;
|
||||
runtime?: RuntimeEnv;
|
||||
sender?: typeof sendMessageWeb;
|
||||
}) {
|
||||
const { to, verbose = false } = opts;
|
||||
const { cfg: cfgOverride, to, verbose = false } = opts;
|
||||
const _runtime = opts.runtime ?? defaultRuntime;
|
||||
const replyResolver = opts.replyResolver ?? getReplyFromConfig;
|
||||
const sender = opts.sender ?? sendMessageWeb;
|
||||
@@ -92,7 +93,7 @@ export async function runWebHeartbeatOnce(opts: {
|
||||
to,
|
||||
});
|
||||
|
||||
const cfg = loadConfig();
|
||||
const cfg = cfgOverride ?? loadConfig();
|
||||
const sessionSnapshot = getSessionSnapshot(cfg, to, true);
|
||||
if (verbose) {
|
||||
heartbeatLogger.info(
|
||||
@@ -184,10 +185,12 @@ function getFallbackRecipient(cfg: ReturnType<typeof loadConfig>) {
|
||||
const store = loadSessionStore(storePath);
|
||||
const candidates = Object.entries(store).filter(([key]) => key !== "global");
|
||||
if (candidates.length === 0) {
|
||||
return (
|
||||
(Array.isArray(cfg.inbound?.allowFrom) && cfg.inbound.allowFrom[0]) ||
|
||||
null
|
||||
);
|
||||
const allowFrom =
|
||||
Array.isArray(cfg.inbound?.allowFrom) && cfg.inbound.allowFrom.length > 0
|
||||
? cfg.inbound.allowFrom.filter((v) => v !== "*")
|
||||
: [];
|
||||
if (allowFrom.length === 0) return null;
|
||||
return allowFrom[0] ? normalizeE164(allowFrom[0]) : null;
|
||||
}
|
||||
const mostRecent = candidates.sort(
|
||||
(a, b) => (b[1]?.updatedAt ?? 0) - (a[1]?.updatedAt ?? 0),
|
||||
@@ -195,6 +198,54 @@ function getFallbackRecipient(cfg: ReturnType<typeof loadConfig>) {
|
||||
return mostRecent ? normalizeE164(mostRecent[0]) : null;
|
||||
}
|
||||
|
||||
function getSessionRecipients(cfg: ReturnType<typeof loadConfig>) {
|
||||
const sessionCfg = cfg.inbound?.reply?.session;
|
||||
const scope = sessionCfg?.scope ?? "per-sender";
|
||||
if (scope === "global") return [];
|
||||
const storePath = resolveStorePath(sessionCfg?.store);
|
||||
const store = loadSessionStore(storePath);
|
||||
return Object.entries(store)
|
||||
.filter(([key]) => key !== "global" && key !== "unknown")
|
||||
.map(([key, entry]) => ({
|
||||
to: normalizeE164(key),
|
||||
updatedAt: entry?.updatedAt ?? 0,
|
||||
}))
|
||||
.filter(({ to }) => Boolean(to))
|
||||
.sort((a, b) => b.updatedAt - a.updatedAt);
|
||||
}
|
||||
|
||||
export function resolveHeartbeatRecipients(
|
||||
cfg: ReturnType<typeof loadConfig>,
|
||||
opts: { to?: string; all?: boolean } = {},
|
||||
) {
|
||||
if (opts.to) return { recipients: [normalizeE164(opts.to)], source: "flag" };
|
||||
|
||||
const sessionRecipients = getSessionRecipients(cfg);
|
||||
const allowFrom =
|
||||
Array.isArray(cfg.inbound?.allowFrom) && cfg.inbound.allowFrom.length > 0
|
||||
? cfg.inbound.allowFrom.filter((v) => v !== "*").map(normalizeE164)
|
||||
: [];
|
||||
|
||||
const unique = (list: string[]) => [...new Set(list.filter(Boolean))];
|
||||
|
||||
if (opts.all) {
|
||||
const all = unique([...sessionRecipients.map((s) => s.to), ...allowFrom]);
|
||||
return { recipients: all, source: "all" as const };
|
||||
}
|
||||
|
||||
if (sessionRecipients.length === 1) {
|
||||
return { recipients: [sessionRecipients[0].to], source: "session-single" };
|
||||
}
|
||||
if (sessionRecipients.length > 1) {
|
||||
return {
|
||||
recipients: sessionRecipients.map((s) => s.to),
|
||||
source: "session-ambiguous" as const,
|
||||
};
|
||||
}
|
||||
|
||||
return { recipients: allowFrom, source: "allowFrom" as const };
|
||||
}
|
||||
|
||||
function getSessionSnapshot(
|
||||
cfg: ReturnType<typeof loadConfig>,
|
||||
from: string,
|
||||
@@ -551,8 +602,8 @@ export async function monitorWebProvider(
|
||||
}
|
||||
|
||||
try {
|
||||
const snapshot = getSessionSnapshot(cfg, lastInboundMsg.from);
|
||||
if (isVerbose()) {
|
||||
const snapshot = getSessionSnapshot(cfg, lastInboundMsg.from);
|
||||
heartbeatLogger.info(
|
||||
{
|
||||
connectionId,
|
||||
@@ -570,7 +621,7 @@ export async function monitorWebProvider(
|
||||
Body: HEARTBEAT_PROMPT,
|
||||
From: lastInboundMsg.from,
|
||||
To: lastInboundMsg.to,
|
||||
MessageSid: undefined,
|
||||
MessageSid: snapshot.entry?.sessionId,
|
||||
MediaPath: undefined,
|
||||
MediaUrl: undefined,
|
||||
MediaType: undefined,
|
||||
|
||||
Reference in New Issue
Block a user