Heartbeat: normalize array replies

This commit is contained in:
Peter Steinberger
2025-12-03 00:40:19 +00:00
parent a7fdc7b992
commit 9da5b9f4bb
4 changed files with 47 additions and 20 deletions

View File

@@ -310,11 +310,23 @@ export async function getReplyFromConfig(
return result; return result;
} }
if (reply && reply.mode === "command" && reply.command?.length) { const isHeartbeat = opts?.isHeartbeat === true;
if (reply && reply.mode === "command") {
const commandArgs =
isHeartbeat && reply.heartbeatCommand?.length
? reply.heartbeatCommand
: reply.command;
if (!commandArgs?.length) {
cleanupTyping();
return undefined;
}
await onReplyStart(); await onReplyStart();
const commandReply = { const commandReply = {
...reply, ...reply,
command: reply.command, command: commandArgs,
mode: "command" as const, mode: "command" as const,
}; };
try { try {

View File

@@ -1,5 +1,6 @@
export type GetReplyOptions = { export type GetReplyOptions = {
onReplyStart?: () => Promise<void> | void; onReplyStart?: () => Promise<void> | void;
isHeartbeat?: boolean;
}; };
export type ReplyPayload = { export type ReplyPayload = {

View File

@@ -191,14 +191,18 @@ export async function runWebHeartbeatOnce(opts: {
To: to, To: to,
MessageSid: sessionId ?? sessionSnapshot.entry?.sessionId, MessageSid: sessionId ?? sessionSnapshot.entry?.sessionId,
}, },
undefined, { isHeartbeat: true },
cfg, cfg,
); );
const replyPayload = Array.isArray(replyResult)
? replyResult[0]
: replyResult;
if ( if (
!replyResult || !replyPayload ||
(!replyResult.text && (!replyPayload.text &&
!replyResult.mediaUrl && !replyPayload.mediaUrl &&
!replyResult.mediaUrls?.length) !replyPayload.mediaUrls?.length)
) { ) {
heartbeatLogger.info( heartbeatLogger.info(
{ {
@@ -213,9 +217,9 @@ export async function runWebHeartbeatOnce(opts: {
} }
const hasMedia = Boolean( const hasMedia = Boolean(
replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0, replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0,
); );
const stripped = stripHeartbeatToken(replyResult.text); const stripped = stripHeartbeatToken(replyPayload.text);
if (stripped.shouldSkip && !hasMedia) { if (stripped.shouldSkip && !hasMedia) {
// Don't let heartbeats keep sessions alive: restore previous updatedAt so idle expiry still works. // Don't let heartbeats keep sessions alive: restore previous updatedAt so idle expiry still works.
const sessionCfg = cfg.inbound?.reply?.session; const sessionCfg = cfg.inbound?.reply?.session;
@@ -227,7 +231,7 @@ export async function runWebHeartbeatOnce(opts: {
} }
heartbeatLogger.info( heartbeatLogger.info(
{ to, reason: "heartbeat-token", rawLength: replyResult.text?.length }, { to, reason: "heartbeat-token", rawLength: replyPayload.text?.length },
"heartbeat skipped", "heartbeat skipped",
); );
console.log(success("heartbeat: ok (HEARTBEAT_OK)")); console.log(success("heartbeat: ok (HEARTBEAT_OK)"));
@@ -241,7 +245,7 @@ export async function runWebHeartbeatOnce(opts: {
); );
} }
const finalText = stripped.text || replyResult.text || ""; const finalText = stripped.text || replyPayload.text || "";
if (dryRun) { if (dryRun) {
heartbeatLogger.info( heartbeatLogger.info(
{ to, reason: "dry-run", chars: finalText.length }, { to, reason: "dry-run", chars: finalText.length },
@@ -963,14 +967,19 @@ export async function monitorWebProvider(
}, },
{ {
onReplyStart: lastInboundMsg.sendComposing, onReplyStart: lastInboundMsg.sendComposing,
isHeartbeat: true,
}, },
); );
const replyPayload = Array.isArray(replyResult)
? replyResult[0]
: replyResult;
if ( if (
!replyResult || !replyPayload ||
(!replyResult.text && (!replyPayload.text &&
!replyResult.mediaUrl && !replyPayload.mediaUrl &&
!replyResult.mediaUrls?.length) !replyPayload.mediaUrls?.length)
) { ) {
heartbeatLogger.info( heartbeatLogger.info(
{ {
@@ -984,9 +993,9 @@ export async function monitorWebProvider(
return; return;
} }
const stripped = stripHeartbeatToken(replyResult.text); const stripped = stripHeartbeatToken(replyPayload.text);
const hasMedia = Boolean( const hasMedia = Boolean(
replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0, replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0,
); );
if (stripped.shouldSkip && !hasMedia) { if (stripped.shouldSkip && !hasMedia) {
heartbeatLogger.info( heartbeatLogger.info(
@@ -994,7 +1003,7 @@ export async function monitorWebProvider(
connectionId, connectionId,
durationMs: Date.now() - tickStart, durationMs: Date.now() - tickStart,
reason: "heartbeat-token", reason: "heartbeat-token",
rawLength: replyResult.text?.length ?? 0, rawLength: replyPayload.text?.length ?? 0,
}, },
"reply heartbeat skipped", "reply heartbeat skipped",
); );
@@ -1014,7 +1023,7 @@ export async function monitorWebProvider(
} }
const cleanedReply: ReplyPayload = { const cleanedReply: ReplyPayload = {
...replyResult, ...replyPayload,
text: finalText, text: finalText,
}; };

View File

@@ -100,7 +100,12 @@ describe("web inbound media saves with extension", () => {
}; };
realSock.ev.emit("messages.upsert", upsert); realSock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setTimeout(resolve, 5));
// Allow a brief window for the async handler to fire on slower runners.
for (let i = 0; i < 10; i++) {
if (onMessage.mock.calls.length > 0) break;
await new Promise((resolve) => setTimeout(resolve, 5));
}
expect(onMessage).toHaveBeenCalledTimes(1); expect(onMessage).toHaveBeenCalledTimes(1);
const msg = onMessage.mock.calls[0][0]; const msg = onMessage.mock.calls[0][0];