Add IPC to prevent Signal session corruption from concurrent connections

When the relay is running, `warelay send` and `warelay heartbeat` now
communicate via Unix socket IPC (~/.warelay/relay.sock) to send messages
through the relay's existing WhatsApp connection.

Previously, these commands created new Baileys sockets that wrote to the
same auth state files, corrupting the Signal session ratchet and causing
the relay's subsequent sends to fail silently.

Changes:
- Add src/web/ipc.ts with Unix socket server/client
- Relay starts IPC server after connecting
- send command tries IPC first, falls back to direct
- heartbeat uses sendWithIpcFallback helper
- inbound.ts exposes sendMessage on listener object
- Messages sent via IPC are added to echo detection set
This commit is contained in:
Peter Steinberger
2025-12-02 06:31:01 +00:00
parent 2fc3a822c8
commit e86b507da7
5 changed files with 354 additions and 3 deletions

View File

@@ -9,12 +9,13 @@ import {
resolveStorePath,
saveSessionStore,
} from "../config/sessions.js";
import { danger, isVerbose, logVerbose, success } from "../globals.js";
import { danger, info, isVerbose, logVerbose, success } from "../globals.js";
import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { monitorWebInbox } from "./inbound.js";
import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js";
import { loadWebMedia } from "./media.js";
import { sendMessageWeb } from "./outbound.js";
import { getQueueSize } from "../process/command-queue.js";
@@ -28,6 +29,26 @@ import {
} from "./reconnect.js";
import { getWebAuthAgeMs } from "./session.js";
/**
* Send a message via IPC if relay is running, otherwise fall back to direct.
* This avoids Signal session corruption from multiple Baileys connections.
*/
async function sendWithIpcFallback(
to: string,
message: string,
opts: { verbose: boolean; mediaUrl?: string },
): Promise<{ messageId: string; toJid: string }> {
const ipcResult = await sendViaIpc(to, message, opts.mediaUrl);
if (ipcResult?.success && ipcResult.messageId) {
if (opts.verbose) {
console.log(info(`Sent via relay IPC (avoiding session corruption)`));
}
return { messageId: ipcResult.messageId, toJid: `${to}@s.whatsapp.net` };
}
// Fall back to direct send
return sendMessageWeb(to, message, opts);
}
const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024;
type WebInboundMsg = Parameters<
typeof monitorWebInbox
@@ -95,7 +116,7 @@ export async function runWebHeartbeatOnce(opts: {
} = opts;
const _runtime = opts.runtime ?? defaultRuntime;
const replyResolver = opts.replyResolver ?? getReplyFromConfig;
const sender = opts.sender ?? sendMessageWeb;
const sender = opts.sender ?? sendWithIpcFallback;
const runId = newConnectionId();
const heartbeatLogger = getChildLogger({
module: "web-heartbeat",
@@ -526,6 +547,8 @@ export async function monitorWebProvider(
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
onMessage: async (msg) => {
// Also add IPC-sent messages to echo detection
// (this is handled below in the IPC sendHandler)
handledMessages += 1;
lastMessageAt = Date.now();
const ts = msg.timestamp
@@ -677,7 +700,33 @@ export async function monitorWebProvider(
},
});
// Start IPC server so `warelay send` can use this connection
// instead of creating a new one (which would corrupt Signal session)
if ("sendMessage" in listener) {
startIpcServer(async (to, message, mediaUrl) => {
let mediaBuffer: Buffer | undefined;
let mediaType: string | undefined;
if (mediaUrl) {
const media = await loadWebMedia(mediaUrl);
mediaBuffer = media.buffer;
mediaType = media.contentType;
}
const result = await listener.sendMessage(to, message, mediaBuffer, mediaType);
// Add to echo detection so we don't process our own message
if (message) {
recentlySent.add(message);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
logInfo(`📤 IPC send to ${to}: ${message.substring(0, 50)}...`, runtime);
return result;
});
}
const closeListener = async () => {
stopIpcServer();
if (heartbeat) clearInterval(heartbeat);
if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer);
if (watchdogTimer) clearInterval(watchdogTimer);

View File

@@ -212,6 +212,51 @@ export async function monitorWebInbox(options: {
}
},
onClose,
/**
* Send a message through this connection's socket.
* Used by IPC to avoid creating new connections.
*/
sendMessage: async (
to: string,
text: string,
mediaBuffer?: Buffer,
mediaType?: string,
): Promise<{ messageId: string }> => {
const jid = `${to.replace(/^\+/, "")}@s.whatsapp.net`;
let payload: AnyMessageContent;
if (mediaBuffer && mediaType) {
if (mediaType.startsWith("image/")) {
payload = {
image: mediaBuffer,
caption: text || undefined,
mimetype: mediaType,
};
} else if (mediaType.startsWith("audio/")) {
payload = {
audio: mediaBuffer,
ptt: true,
mimetype: mediaType,
};
} else if (mediaType.startsWith("video/")) {
payload = {
video: mediaBuffer,
caption: text || undefined,
mimetype: mediaType,
};
} else {
payload = {
document: mediaBuffer,
fileName: "file",
caption: text || undefined,
mimetype: mediaType,
};
}
} else {
payload = { text };
}
const result = await sock.sendMessage(jid, payload);
return { messageId: result?.key?.id ?? "unknown" };
},
} as const;
}

225
src/web/ipc.ts Normal file
View File

@@ -0,0 +1,225 @@
/**
* IPC server for warelay relay.
*
* When the relay is running, it starts a Unix socket server that allows
* `warelay send` and `warelay heartbeat` to send messages through the
* existing WhatsApp connection instead of creating new ones.
*
* This prevents Signal session ratchet corruption from multiple connections.
*/
import fs from "node:fs";
import net from "node:net";
import os from "node:os";
import path from "node:path";
import { getChildLogger } from "../logging.js";
const SOCKET_PATH = path.join(os.homedir(), ".warelay", "relay.sock");
export interface IpcSendRequest {
type: "send";
to: string;
message: string;
mediaUrl?: string;
}
export interface IpcSendResponse {
success: boolean;
messageId?: string;
error?: string;
}
type SendHandler = (
to: string,
message: string,
mediaUrl?: string,
) => Promise<{ messageId: string }>;
let server: net.Server | null = null;
/**
* Start the IPC server. Called by the relay when it starts.
*/
export function startIpcServer(sendHandler: SendHandler): void {
const logger = getChildLogger({ module: "ipc-server" });
// Clean up stale socket file
try {
fs.unlinkSync(SOCKET_PATH);
} catch {
// Ignore if doesn't exist
}
server = net.createServer((conn) => {
let buffer = "";
conn.on("data", async (data) => {
buffer += data.toString();
// Try to parse complete JSON messages (newline-delimited)
const lines = buffer.split("\n");
buffer = lines.pop() ?? ""; // Keep incomplete line in buffer
for (const line of lines) {
if (!line.trim()) continue;
try {
const request = JSON.parse(line) as IpcSendRequest;
if (request.type === "send") {
try {
const result = await sendHandler(
request.to,
request.message,
request.mediaUrl,
);
const response: IpcSendResponse = {
success: true,
messageId: result.messageId,
};
conn.write(JSON.stringify(response) + "\n");
} catch (err) {
const response: IpcSendResponse = {
success: false,
error: String(err),
};
conn.write(JSON.stringify(response) + "\n");
}
}
} catch (err) {
logger.warn({ error: String(err) }, "failed to parse IPC request");
const response: IpcSendResponse = {
success: false,
error: "Invalid request format",
};
conn.write(JSON.stringify(response) + "\n");
}
}
});
conn.on("error", (err) => {
logger.debug({ error: String(err) }, "IPC connection error");
});
});
server.listen(SOCKET_PATH, () => {
logger.info({ socketPath: SOCKET_PATH }, "IPC server started");
// Make socket accessible
fs.chmodSync(SOCKET_PATH, 0o600);
});
server.on("error", (err) => {
logger.error({ error: String(err) }, "IPC server error");
});
}
/**
* Stop the IPC server. Called when relay shuts down.
*/
export function stopIpcServer(): void {
if (server) {
server.close();
server = null;
}
try {
fs.unlinkSync(SOCKET_PATH);
} catch {
// Ignore
}
}
/**
* Check if the relay IPC server is running.
*/
export function isRelayRunning(): boolean {
try {
fs.accessSync(SOCKET_PATH);
return true;
} catch {
return false;
}
}
/**
* Send a message through the running relay's IPC.
* Returns null if relay is not running.
*/
export async function sendViaIpc(
to: string,
message: string,
mediaUrl?: string,
): Promise<IpcSendResponse | null> {
if (!isRelayRunning()) {
return null;
}
return new Promise((resolve) => {
const client = net.createConnection(SOCKET_PATH);
let buffer = "";
let resolved = false;
const timeout = setTimeout(() => {
if (!resolved) {
resolved = true;
client.destroy();
resolve({ success: false, error: "IPC timeout" });
}
}, 30000); // 30 second timeout
client.on("connect", () => {
const request: IpcSendRequest = {
type: "send",
to,
message,
mediaUrl,
};
client.write(JSON.stringify(request) + "\n");
});
client.on("data", (data) => {
buffer += data.toString();
const lines = buffer.split("\n");
for (const line of lines) {
if (!line.trim()) continue;
try {
const response = JSON.parse(line) as IpcSendResponse;
if (!resolved) {
resolved = true;
clearTimeout(timeout);
client.end();
resolve(response);
}
return;
} catch {
// Keep reading
}
}
});
client.on("error", (err) => {
if (!resolved) {
resolved = true;
clearTimeout(timeout);
// Socket exists but can't connect - relay might have crashed
resolve(null);
}
});
client.on("close", () => {
if (!resolved) {
resolved = true;
clearTimeout(timeout);
resolve({ success: false, error: "Connection closed" });
}
});
});
}
/**
* Get the IPC socket path for debugging/status.
*/
export function getSocketPath(): string {
return SOCKET_PATH;
}