fix(whatsapp): suppress typing during heartbeats
- Prevent typing indicator during heartbeat runs - Add regression tests Co-authored-by: Jake <mcinteerj@gmail.com>
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
- Cron tool passes `id` to the gateway for update/remove/run/runs (keeps `jobId` input). (#180) — thanks @adamgall
|
||||
- macOS: treat location permission as always-only to avoid iOS-only enums. (#165) — thanks @Nachx639
|
||||
- macOS: make generated gateway protocol models `Sendable` for Swift 6 strict concurrency. (#195) — thanks @andranik-sahakyan
|
||||
- WhatsApp: suppress typing indicator during heartbeat background tasks. (#190) — thanks @mcinteerj
|
||||
- Onboarding: when running from source, auto-build missing Control UI assets (`pnpm ui:build`).
|
||||
- Discord/Slack: route reaction + system notifications to the correct session (no main-session bleed).
|
||||
|
||||
|
||||
111
src/auto-reply/reply.heartbeat-typing.test.ts
Normal file
111
src/auto-reply/reply.heartbeat-typing.test.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const runEmbeddedPiAgentMock = vi.fn();
|
||||
|
||||
vi.mock("../agents/model-fallback.js", () => ({
|
||||
runWithModelFallback: async ({
|
||||
provider,
|
||||
model,
|
||||
run,
|
||||
}: {
|
||||
provider: string;
|
||||
model: string;
|
||||
run: (provider: string, model: string) => Promise<unknown>;
|
||||
}) => ({
|
||||
result: await run(provider, model),
|
||||
provider,
|
||||
model,
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../agents/pi-embedded.js", () => ({
|
||||
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
|
||||
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
|
||||
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
||||
resolveEmbeddedSessionLane: (key: string) =>
|
||||
`session:${key.trim() || "main"}`,
|
||||
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
|
||||
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
|
||||
}));
|
||||
|
||||
const webMocks = vi.hoisted(() => ({
|
||||
webAuthExists: vi.fn().mockResolvedValue(true),
|
||||
getWebAuthAgeMs: vi.fn().mockReturnValue(120_000),
|
||||
readWebSelfId: vi.fn().mockReturnValue({ e164: "+1999" }),
|
||||
}));
|
||||
|
||||
vi.mock("../web/session.js", () => webMocks);
|
||||
|
||||
import { getReplyFromConfig } from "./reply.js";
|
||||
|
||||
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
|
||||
const base = await fs.mkdtemp(join(tmpdir(), "clawdbot-typing-"));
|
||||
const previousHome = process.env.HOME;
|
||||
process.env.HOME = base;
|
||||
try {
|
||||
runEmbeddedPiAgentMock.mockClear();
|
||||
return await fn(base);
|
||||
} finally {
|
||||
process.env.HOME = previousHome;
|
||||
await fs.rm(base, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
function makeCfg(home: string) {
|
||||
return {
|
||||
agent: {
|
||||
model: "anthropic/claude-opus-4-5",
|
||||
workspace: join(home, "clawd"),
|
||||
},
|
||||
whatsapp: {
|
||||
allowFrom: ["*"],
|
||||
},
|
||||
session: { store: join(home, "sessions.json") },
|
||||
};
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe("getReplyFromConfig typing (heartbeat)", () => {
|
||||
it("starts typing for normal runs", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
||||
payloads: [{ text: "ok" }],
|
||||
meta: {},
|
||||
});
|
||||
const onReplyStart = vi.fn();
|
||||
|
||||
await getReplyFromConfig(
|
||||
{ Body: "hi", From: "+1000", To: "+2000", Surface: "whatsapp" },
|
||||
{ onReplyStart, isHeartbeat: false },
|
||||
makeCfg(home),
|
||||
);
|
||||
|
||||
expect(onReplyStart).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("does not start typing for heartbeat runs", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
||||
payloads: [{ text: "ok" }],
|
||||
meta: {},
|
||||
});
|
||||
const onReplyStart = vi.fn();
|
||||
|
||||
await getReplyFromConfig(
|
||||
{ Body: "hi", From: "+1000", To: "+2000", Surface: "whatsapp" },
|
||||
{ onReplyStart, isHeartbeat: true },
|
||||
makeCfg(home),
|
||||
);
|
||||
|
||||
expect(onReplyStart).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -467,7 +467,8 @@ export async function getReplyFromConfig(
|
||||
const isFirstTurnInSession = isNewSession || !systemSent;
|
||||
const isGroupChat = sessionCtx.ChatType === "group";
|
||||
const wasMentioned = ctx.WasMentioned === true;
|
||||
const shouldEagerType = !isGroupChat || wasMentioned;
|
||||
const isHeartbeat = opts?.isHeartbeat === true;
|
||||
const shouldEagerType = (!isGroupChat || wasMentioned) && !isHeartbeat;
|
||||
const shouldInjectGroupIntro = Boolean(
|
||||
isGroupChat &&
|
||||
(isFirstTurnInSession || sessionEntry?.groupActivationNeedsSystemIntro),
|
||||
|
||||
156
src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts
Normal file
156
src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts
Normal file
@@ -0,0 +1,156 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { TemplateContext } from "../templating.js";
|
||||
import type { GetReplyOptions } from "../types.js";
|
||||
import type { FollowupRun, QueueSettings } from "./queue.js";
|
||||
import type { TypingController } from "./typing.js";
|
||||
|
||||
const runEmbeddedPiAgentMock = vi.fn();
|
||||
|
||||
vi.mock("../../agents/model-fallback.js", () => ({
|
||||
runWithModelFallback: async ({
|
||||
provider,
|
||||
model,
|
||||
run,
|
||||
}: {
|
||||
provider: string;
|
||||
model: string;
|
||||
run: (provider: string, model: string) => Promise<unknown>;
|
||||
}) => ({
|
||||
result: await run(provider, model),
|
||||
provider,
|
||||
model,
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../../agents/pi-embedded.js", () => ({
|
||||
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
||||
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
|
||||
}));
|
||||
|
||||
vi.mock("./queue.js", async () => {
|
||||
const actual =
|
||||
await vi.importActual<typeof import("./queue.js")>("./queue.js");
|
||||
return {
|
||||
...actual,
|
||||
enqueueFollowupRun: vi.fn(),
|
||||
scheduleFollowupDrain: vi.fn(),
|
||||
};
|
||||
});
|
||||
|
||||
import { runReplyAgent } from "./agent-runner.js";
|
||||
|
||||
function createTyping(): TypingController {
|
||||
return {
|
||||
onReplyStart: vi.fn(async () => {}),
|
||||
startTypingLoop: vi.fn(async () => {}),
|
||||
startTypingOnText: vi.fn(async () => {}),
|
||||
refreshTypingTtl: vi.fn(),
|
||||
cleanup: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
type EmbeddedPiAgentParams = {
|
||||
onPartialReply?: (payload: { text?: string }) => Promise<void> | void;
|
||||
};
|
||||
|
||||
function createMinimalRun(params?: { opts?: GetReplyOptions }) {
|
||||
const typing = createTyping();
|
||||
const opts = params?.opts;
|
||||
const sessionCtx = {
|
||||
Surface: "whatsapp",
|
||||
MessageSid: "msg",
|
||||
} as unknown as TemplateContext;
|
||||
const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings;
|
||||
const followupRun = {
|
||||
prompt: "hello",
|
||||
summaryLine: "hello",
|
||||
enqueuedAt: Date.now(),
|
||||
run: {
|
||||
sessionId: "session",
|
||||
sessionKey: "main",
|
||||
surface: "whatsapp",
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
workspaceDir: "/tmp",
|
||||
config: {},
|
||||
skillsSnapshot: {},
|
||||
provider: "anthropic",
|
||||
model: "claude",
|
||||
thinkLevel: "low",
|
||||
verboseLevel: "off",
|
||||
elevatedLevel: "off",
|
||||
bashElevated: {
|
||||
enabled: false,
|
||||
allowed: false,
|
||||
defaultLevel: "off",
|
||||
},
|
||||
timeoutMs: 1_000,
|
||||
blockReplyBreak: "message_end",
|
||||
},
|
||||
} as unknown as FollowupRun;
|
||||
|
||||
return {
|
||||
typing,
|
||||
opts,
|
||||
run: () =>
|
||||
runReplyAgent({
|
||||
commandBody: "hello",
|
||||
followupRun,
|
||||
queueKey: "main",
|
||||
resolvedQueue,
|
||||
shouldSteer: false,
|
||||
shouldFollowup: false,
|
||||
isActive: false,
|
||||
isStreaming: false,
|
||||
opts,
|
||||
typing,
|
||||
sessionCtx,
|
||||
defaultModel: "anthropic/claude-opus-4-5",
|
||||
resolvedVerboseLevel: "off",
|
||||
isNewSession: false,
|
||||
blockStreamingEnabled: false,
|
||||
resolvedBlockStreamingBreak: "message_end",
|
||||
shouldInjectGroupIntro: false,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
describe("runReplyAgent typing (heartbeat)", () => {
|
||||
it("signals typing for normal runs", async () => {
|
||||
const onPartialReply = vi.fn();
|
||||
runEmbeddedPiAgentMock.mockImplementationOnce(
|
||||
async (params: EmbeddedPiAgentParams) => {
|
||||
await params.onPartialReply?.({ text: "hi" });
|
||||
return { payloads: [{ text: "final" }], meta: {} };
|
||||
},
|
||||
);
|
||||
|
||||
const { run, typing } = createMinimalRun({
|
||||
opts: { isHeartbeat: false, onPartialReply },
|
||||
});
|
||||
await run();
|
||||
|
||||
expect(onPartialReply).toHaveBeenCalled();
|
||||
expect(typing.startTypingOnText).toHaveBeenCalledWith("hi");
|
||||
expect(typing.startTypingLoop).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("never signals typing for heartbeat runs", async () => {
|
||||
const onPartialReply = vi.fn();
|
||||
runEmbeddedPiAgentMock.mockImplementationOnce(
|
||||
async (params: EmbeddedPiAgentParams) => {
|
||||
await params.onPartialReply?.({ text: "hi" });
|
||||
return { payloads: [{ text: "final" }], meta: {} };
|
||||
},
|
||||
);
|
||||
|
||||
const { run, typing } = createMinimalRun({
|
||||
opts: { isHeartbeat: true, onPartialReply },
|
||||
});
|
||||
await run();
|
||||
|
||||
expect(onPartialReply).toHaveBeenCalled();
|
||||
expect(typing.startTypingOnText).not.toHaveBeenCalled();
|
||||
expect(typing.startTypingLoop).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -84,6 +84,8 @@ export async function runReplyAgent(params: {
|
||||
shouldInjectGroupIntro,
|
||||
} = params;
|
||||
|
||||
const isHeartbeat = opts?.isHeartbeat === true;
|
||||
|
||||
const shouldEmitToolResult = () => {
|
||||
if (!sessionKey || !storePath) {
|
||||
return resolvedVerboseLevel === "on";
|
||||
@@ -203,7 +205,7 @@ export async function runReplyAgent(params: {
|
||||
onPartialReply: opts?.onPartialReply
|
||||
? async (payload) => {
|
||||
let text = payload.text;
|
||||
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, {
|
||||
mode: "message",
|
||||
});
|
||||
@@ -221,7 +223,9 @@ export async function runReplyAgent(params: {
|
||||
}
|
||||
text = stripped.text;
|
||||
}
|
||||
await typing.startTypingOnText(text);
|
||||
if (!isHeartbeat) {
|
||||
await typing.startTypingOnText(text);
|
||||
}
|
||||
await opts.onPartialReply?.({
|
||||
text,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
@@ -232,7 +236,7 @@ export async function runReplyAgent(params: {
|
||||
blockStreamingEnabled && opts?.onBlockReply
|
||||
? async (payload) => {
|
||||
let text = payload.text;
|
||||
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, {
|
||||
mode: "message",
|
||||
});
|
||||
@@ -270,7 +274,9 @@ export async function runReplyAgent(params: {
|
||||
}
|
||||
pendingStreamedPayloadKeys.add(payloadKey);
|
||||
const task = (async () => {
|
||||
await typing.startTypingOnText(cleaned);
|
||||
if (!isHeartbeat) {
|
||||
await typing.startTypingOnText(cleaned);
|
||||
}
|
||||
await opts.onBlockReply?.(blockPayload);
|
||||
})()
|
||||
.then(() => {
|
||||
@@ -293,7 +299,7 @@ export async function runReplyAgent(params: {
|
||||
onToolResult: opts?.onToolResult
|
||||
? async (payload) => {
|
||||
let text = payload.text;
|
||||
if (!opts?.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) {
|
||||
const stripped = stripHeartbeatToken(text, {
|
||||
mode: "message",
|
||||
});
|
||||
@@ -311,7 +317,9 @@ export async function runReplyAgent(params: {
|
||||
}
|
||||
text = stripped.text;
|
||||
}
|
||||
await typing.startTypingOnText(text);
|
||||
if (!isHeartbeat) {
|
||||
await typing.startTypingOnText(text);
|
||||
}
|
||||
await opts.onToolResult?.({
|
||||
text,
|
||||
mediaUrls: payload.mediaUrls,
|
||||
@@ -356,7 +364,7 @@ export async function runReplyAgent(params: {
|
||||
await Promise.allSettled(pendingBlockTasks);
|
||||
}
|
||||
|
||||
const sanitizedPayloads = opts?.isHeartbeat
|
||||
const sanitizedPayloads = isHeartbeat
|
||||
? payloadArray
|
||||
: payloadArray.flatMap((payload) => {
|
||||
const text = payload.text;
|
||||
@@ -410,7 +418,7 @@ export async function runReplyAgent(params: {
|
||||
if (payload.mediaUrls && payload.mediaUrls.length > 0) return true;
|
||||
return false;
|
||||
});
|
||||
if (shouldSignalTyping) {
|
||||
if (shouldSignalTyping && !isHeartbeat) {
|
||||
await typing.startTypingLoop();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user