feat(web): add group chat mention support

This commit is contained in:
Peter Steinberger
2025-12-03 12:35:18 +00:00
parent 273f2b61d0
commit 6afe6f4ecb
10 changed files with 395 additions and 32 deletions

View File

@@ -43,6 +43,12 @@ export type WebConfig = {
reconnect?: WebReconnectConfig;
};
export type GroupChatConfig = {
requireMention?: boolean;
mentionPatterns?: string[];
historyLimit?: number;
};
export type WarelayConfig = {
logging?: LoggingConfig;
inbound?: {
@@ -55,6 +61,7 @@ export type WarelayConfig = {
command: string[];
timeoutSeconds?: number;
};
groupChat?: GroupChatConfig;
reply?: {
mode: ReplyMode;
text?: string;
@@ -172,6 +179,13 @@ const WarelaySchema = z.object({
messagePrefix: z.string().optional(),
responsePrefix: z.string().optional(),
timestampPrefix: z.union([z.boolean(), z.string()]).optional(),
groupChat: z
.object({
requireMention: z.boolean().optional(),
mentionPatterns: z.array(z.string()).optional(),
historyLimit: z.number().int().positive().optional(),
})
.optional(),
transcribeAudio: z
.object({
command: z.array(z.string()),

View File

@@ -16,4 +16,10 @@ describe("sessions", () => {
it("global scope returns global", () => {
expect(deriveSessionKey("global", { From: "+1" })).toBe("global");
});
it("keeps group chats distinct", () => {
expect(
deriveSessionKey("per-sender", { From: "12345-678@g.us" }),
).toBe("group:12345-678@g.us");
});
});

View File

@@ -59,5 +59,12 @@ export async function saveSessionStore(
export function deriveSessionKey(scope: SessionScope, ctx: MsgContext) {
if (scope === "global") return "global";
const from = ctx.From ? normalizeE164(ctx.From) : "";
// Preserve group conversations as distinct buckets
if (typeof ctx.From === "string" && ctx.From.includes("@g.us")) {
return `group:${ctx.From}`;
}
if (typeof ctx.From === "string" && ctx.From.startsWith("group:")) {
return ctx.From;
}
return from || "unknown";
}

View File

@@ -1006,6 +1006,70 @@ describe("web auto-reply", () => {
fetchMock.mockRestore();
});
it("requires mention in group chats and injects history when replying", async () => {
const sendMedia = vi.fn();
const reply = vi.fn().mockResolvedValue(undefined);
const sendComposing = vi.fn();
const resolver = vi.fn().mockResolvedValue({ text: "ok" });
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "hello group",
from: "123@g.us",
conversationId: "123@g.us",
chatId: "123@g.us",
chatType: "group",
to: "+2",
id: "g1",
senderE164: "+111",
senderName: "Alice",
selfE164: "+999",
sendComposing,
reply,
sendMedia,
});
expect(resolver).not.toHaveBeenCalled();
await capturedOnMessage?.({
body: "@bot ping",
from: "123@g.us",
conversationId: "123@g.us",
chatId: "123@g.us",
chatType: "group",
to: "+2",
id: "g2",
senderE164: "+222",
senderName: "Bob",
mentionedJids: ["999@s.whatsapp.net"],
selfE164: "+999",
selfJid: "999@s.whatsapp.net",
sendComposing,
reply,
sendMedia,
});
expect(resolver).toHaveBeenCalledTimes(1);
const payload = resolver.mock.calls[0][0];
expect(payload.Body).toContain("Chat messages since your last reply");
expect(payload.Body).toContain("Alice: hello group");
expect(payload.Body).toContain("@bot ping");
});
it("emits heartbeat logs with connection metadata", async () => {
vi.useFakeTimers();
const logPath = `/tmp/warelay-heartbeat-${crypto.randomUUID()}.log`;

View File

@@ -15,7 +15,7 @@ import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { getQueueSize } from "../process/command-queue.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { jidToE164, normalizeE164 } from "../utils.js";
import { monitorWebInbox } from "./inbound.js";
import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js";
import { loadWebMedia } from "./media.js";
@@ -31,6 +31,7 @@ import {
import { getWebAuthAgeMs } from "./session.js";
const WEB_TEXT_LIMIT = 4000;
const DEFAULT_GROUP_HISTORY_LIMIT = 50;
/**
* Send a message via IPC if relay is running, otherwise fall back to direct.
@@ -74,6 +75,43 @@ const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30;
export const HEARTBEAT_TOKEN = "HEARTBEAT_OK";
export const HEARTBEAT_PROMPT = "HEARTBEAT /think:high";
type MentionConfig = {
requireMention: boolean;
mentionRegexes: RegExp[];
};
function buildMentionConfig(cfg: ReturnType<typeof loadConfig>): MentionConfig {
const gc = cfg.inbound?.groupChat;
const requireMention = gc?.requireMention !== false; // default true
const mentionRegexes =
gc?.mentionPatterns?.map((p) => {
try {
return new RegExp(p, "i");
} catch {
return null;
}
}).filter((r): r is RegExp => Boolean(r)) ?? [];
return { requireMention, mentionRegexes };
}
function isBotMentioned(
msg: WebInboundMsg,
mentionCfg: MentionConfig,
): boolean {
if (msg.mentionedJids?.length) {
const normalizedMentions = msg.mentionedJids
.map((jid) => jidToE164(jid) ?? jid)
.filter(Boolean);
if (msg.selfE164 && normalizedMentions.includes(msg.selfE164)) return true;
if (msg.selfJid && msg.selfE164) {
// Some mentions use the bare JID; match on E.164 to be safe.
const bareSelf = msg.selfJid.replace(/:\\d+/, "");
if (normalizedMentions.includes(bareSelf)) return true;
}
}
return mentionCfg.mentionRegexes.some((re) => re.test(msg.body));
}
export function resolveReplyHeartbeatMinutes(
cfg: ReturnType<typeof loadConfig>,
overrideMinutes?: number,
@@ -525,6 +563,13 @@ export async function monitorWebProvider(
tuning.replyHeartbeatMinutes,
);
const reconnectPolicy = resolveReconnectPolicy(cfg, tuning.reconnect);
const mentionConfig = buildMentionConfig(cfg);
const groupHistoryLimit =
cfg.inbound?.groupChat?.historyLimit ?? DEFAULT_GROUP_HISTORY_LIMIT;
const groupHistories = new Map<
string,
Array<{ sender: string; body: string; timestamp?: number }>
>();
const sleep =
tuning.sleep ??
((ms: number, signal?: AbortSignal) =>
@@ -574,8 +619,7 @@ export async function monitorWebProvider(
const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages
const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute
// Batch inbound messages while command queue is busy, then send one
// combined prompt with per-message timestamps (inbound-only behavior).
// Batch inbound messages per conversation while command queue is busy.
type PendingBatch = { messages: WebInboundMsg[]; timer?: NodeJS.Timeout };
const pendingBatches = new Map<string, PendingBatch>();
@@ -600,22 +644,43 @@ export async function monitorWebProvider(
messagePrefix = hasAllowFrom ? "" : "[warelay]";
}
const prefixStr = messagePrefix ? `${messagePrefix} ` : "";
return `${formatTimestamp(msg.timestamp)}${prefixStr}${msg.body}`;
const senderLabel =
msg.chatType === "group"
? `${msg.senderName ?? msg.senderE164 ?? "Someone"}: `
: "";
return `${formatTimestamp(msg.timestamp)}${prefixStr}${senderLabel}${msg.body}`;
};
const processBatch = async (from: string) => {
const batch = pendingBatches.get(from);
const processBatch = async (conversationId: string) => {
const batch = pendingBatches.get(conversationId);
if (!batch || batch.messages.length === 0) return;
if (getQueueSize() > 0) {
// Wait until command queue is free to run the combined prompt.
batch.timer = setTimeout(() => void processBatch(from), 150);
batch.timer = setTimeout(() => void processBatch(conversationId), 150);
return;
}
pendingBatches.delete(from);
pendingBatches.delete(conversationId);
const messages = batch.messages;
const latest = messages[messages.length - 1];
const combinedBody = messages.map(buildLine).join("\n");
let combinedBody = messages.map(buildLine).join("\n");
if (latest.chatType === "group") {
const history = groupHistories.get(conversationId) ?? [];
const historyWithoutCurrent =
history.length > 0 ? history.slice(0, -1) : [];
if (historyWithoutCurrent.length > 0) {
const historyText = historyWithoutCurrent
.map(
(m) =>
`${m.sender}: ${m.body}${m.timestamp ? ` [${new Date(m.timestamp).toISOString()}]` : ""}`,
)
.join("\\n");
combinedBody = `[Chat messages since your last reply - for context]\\n${historyText}\\n\\n[Current message - respond to this]\\n${buildLine(latest)}`;
}
// Clear stored history after using it
groupHistories.set(conversationId, []);
}
// Echo detection uses combined body so we don't respond twice.
if (recentlySent.has(combinedBody)) {
@@ -629,7 +694,7 @@ export async function monitorWebProvider(
{
connectionId,
correlationId,
from,
from: latest.chatType === "group" ? conversationId : latest.from,
to: latest.to,
body: combinedBody,
mediaType: latest.mediaType ?? null,
@@ -642,7 +707,11 @@ export async function monitorWebProvider(
const tsDisplay = latest.timestamp
? new Date(latest.timestamp).toISOString()
: new Date().toISOString();
console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`);
const fromDisplay =
latest.chatType === "group" ? conversationId : latest.from;
console.log(
`\n[${tsDisplay}] ${fromDisplay} -> ${latest.to}: ${combinedBody}`,
);
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
@@ -724,21 +793,22 @@ export async function monitorWebProvider(
);
}
}
};
};
const enqueueBatch = async (msg: WebInboundMsg) => {
const bucket = pendingBatches.get(msg.from) ?? { messages: [] };
bucket.messages.push(msg);
pendingBatches.set(msg.from, bucket);
const enqueueBatch = async (msg: WebInboundMsg) => {
const key = msg.conversationId ?? msg.from;
const bucket = pendingBatches.get(key) ?? { messages: [] };
bucket.messages.push(msg);
pendingBatches.set(key, bucket);
// Process immediately when queue is free; otherwise wait until it drains.
if (getQueueSize() === 0) {
await processBatch(msg.from);
} else {
bucket.timer =
bucket.timer ?? setTimeout(() => void processBatch(msg.from), 150);
}
};
// Process immediately when queue is free; otherwise wait until it drains.
if (getQueueSize() === 0) {
await processBatch(key);
} else {
bucket.timer =
bucket.timer ?? setTimeout(() => void processBatch(key), 150);
}
};
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
@@ -746,6 +816,7 @@ export async function monitorWebProvider(
handledMessages += 1;
lastMessageAt = Date.now();
lastInboundMsg = msg;
const conversationId = msg.conversationId ?? msg.from;
// Same-phone mode logging retained
if (msg.from === msg.to) {
@@ -762,6 +833,27 @@ export async function monitorWebProvider(
return;
}
if (msg.chatType === "group") {
const history =
groupHistories.get(conversationId) ??
([] as Array<{ sender: string; body: string; timestamp?: number }>);
history.push({
sender: msg.senderName ?? msg.senderE164 ?? "Unknown",
body: msg.body,
timestamp: msg.timestamp,
});
while (history.length > groupHistoryLimit) history.shift();
groupHistories.set(conversationId, history);
const wasMentioned = isBotMentioned(msg, mentionConfig);
if (mentionConfig.requireMention && !wasMentioned) {
logVerbose(
`Group message stored for context (no mention detected) in ${conversationId}`,
);
return;
}
}
return enqueueBatch(msg);
},
});
@@ -884,6 +976,14 @@ export async function monitorWebProvider(
return;
}
if (!replyHeartbeatMinutes) return;
if (lastInboundMsg?.chatType === "group") {
heartbeatLogger.info(
{ connectionId, reason: "last-inbound-group" },
"reply heartbeat skipped",
);
console.log(success("heartbeat: skipped (group chat)"));
return;
}
const tickStart = Date.now();
if (!lastInboundMsg) {
const fallbackTo = getFallbackRecipient(cfg);

View File

@@ -6,6 +6,7 @@ import type {
import {
DisconnectReason,
downloadMediaMessage,
isJidGroup,
} from "@whiskeysockets/baileys";
import { loadConfig } from "../config/config.js";
@@ -27,11 +28,20 @@ export type WebListenerCloseReason = {
export type WebInboundMessage = {
id?: string;
from: string;
from: string; // conversation id: E.164 for direct chats, group JID for groups
conversationId: string; // alias for clarity (same as from)
to: string;
body: string;
pushName?: string;
timestamp?: number;
chatType: "direct" | "group";
chatId: string;
senderJid?: string;
senderE164?: string;
senderName?: string;
mentionedJids?: string[];
selfJid?: string | null;
selfE164?: string | null;
sendComposing: () => Promise<void>;
reply: (text: string) => Promise<void>;
sendMedia: (payload: AnyMessageContent) => Promise<void>;
@@ -93,7 +103,11 @@ export async function monitorWebInbox(options: {
logVerbose(`Failed to mark message ${id} read: ${String(err)}`);
}
}
const from = jidToE164(remoteJid);
const group = isJidGroup(remoteJid);
const participantJid = msg.key?.participant ?? undefined;
const senderE164 = participantJid ? jidToE164(participantJid) : null;
const from = group ? remoteJid : jidToE164(remoteJid);
// Skip if we still can't resolve an id to key conversation
if (!from) continue;
// Filter unauthorized senders early to prevent wasted processing
@@ -103,12 +117,12 @@ export async function monitorWebInbox(options: {
const isSamePhone = from === selfE164;
if (!isSamePhone && Array.isArray(allowFrom) && allowFrom.length > 0) {
if (
!allowFrom.includes("*") &&
!allowFrom.map(normalizeE164).includes(from)
) {
const candidate =
group && senderE164 ? normalizeE164(senderE164) : from;
const allowedList = allowFrom.map(normalizeE164);
if (!allowFrom.includes("*") && !allowedList.includes(candidate)) {
logVerbose(
`Blocked unauthorized sender ${from} (not in allowFrom list)`,
`Blocked unauthorized sender ${candidate} (not in allowFrom list)`,
);
continue; // Skip processing entirely
}
@@ -151,6 +165,11 @@ export async function monitorWebInbox(options: {
const timestamp = msg.messageTimestamp
? Number(msg.messageTimestamp) * 1000
: undefined;
const mentionedJids =
msg.message?.extendedTextMessage?.contextInfo?.mentionedJid ??
msg.message?.extendedTextMessage?.contextInfo?.quotedMessage
?.extendedTextMessage?.contextInfo?.mentionedJid;
const senderName = msg.pushName ?? undefined;
inboundLogger.info(
{
from,
@@ -166,10 +185,19 @@ export async function monitorWebInbox(options: {
await options.onMessage({
id,
from,
conversationId: from,
to: selfE164 ?? "me",
body,
pushName: msg.pushName ?? undefined,
pushName: senderName,
timestamp,
chatType: group ? "group" : "direct",
chatId: remoteJid,
senderJid: participantJid,
senderE164: senderE164 ?? undefined,
senderName,
mentionedJids: mentionedJids ?? undefined,
selfJid,
selfE164,
sendComposing,
reply,
sendMedia,

View File

@@ -230,6 +230,46 @@ describe("web monitor inbox", () => {
await listener.close();
});
it("passes through group messages with participant metadata", async () => {
const onMessage = vi.fn();
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = await createWaSocket();
const upsert = {
type: "notify",
messages: [
{
key: {
id: "grp2",
fromMe: false,
remoteJid: "99999@g.us",
participant: "777@s.whatsapp.net",
},
pushName: "Alice",
message: {
extendedTextMessage: {
text: "@bot ping",
contextInfo: { mentionedJid: ["123@s.whatsapp.net"] },
},
},
messageTimestamp: 1_700_000_000,
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({
chatType: "group",
conversationId: "99999@g.us",
senderE164: "+777",
mentionedJids: ["123@s.whatsapp.net"],
}),
);
await listener.close();
});
it("blocks messages from unauthorized senders not in allowFrom", async () => {
// Test for auto-recovery fix: early allowFrom filtering prevents Bad MAC errors
// from unauthorized senders corrupting sessions
@@ -281,6 +321,52 @@ describe("web monitor inbox", () => {
await listener.close();
});
it("applies allowFrom to group participants", async () => {
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["+1234"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
const onMessage = vi.fn();
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = await createWaSocket();
const upsert = {
type: "notify",
messages: [
{
key: {
id: "grp3",
fromMe: false,
remoteJid: "11111@g.us",
participant: "999@s.whatsapp.net",
},
message: { conversation: "unauthorized group message" },
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).not.toHaveBeenCalled();
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
await listener.close();
});
it("allows messages from senders in allowFrom list", async () => {
mockLoadConfig.mockReturnValue({
inbound: {