gateway: drop ipc and simplify cli
This commit is contained in:
@@ -19,7 +19,6 @@ import { getQueueSize } from "../process/command-queue.js";
|
||||
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { jidToE164, normalizeE164 } from "../utils.js";
|
||||
import { monitorWebInbox } from "./inbound.js";
|
||||
import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js";
|
||||
import { loadWebMedia } from "./media.js";
|
||||
import { sendMessageWhatsApp } from "./outbound.js";
|
||||
import {
|
||||
@@ -41,23 +40,13 @@ export function setHeartbeatsEnabled(enabled: boolean) {
|
||||
heartbeatsEnabled = enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message via IPC if gateway is running, otherwise fall back to direct.
|
||||
* This avoids Signal session corruption from multiple Baileys connections.
|
||||
*/
|
||||
// Send via the active gateway-backed listener. The monitor already owns the single
|
||||
// Baileys session, so use its send API directly.
|
||||
async function sendWithIpcFallback(
|
||||
to: string,
|
||||
message: string,
|
||||
opts: { verbose: boolean; mediaUrl?: string },
|
||||
): Promise<{ messageId: string; toJid: string }> {
|
||||
const ipcResult = await sendViaIpc(to, message, opts.mediaUrl);
|
||||
if (ipcResult?.success && ipcResult.messageId) {
|
||||
if (opts.verbose) {
|
||||
console.log(info(`Sent via gateway IPC (avoiding session corruption)`));
|
||||
}
|
||||
return { messageId: ipcResult.messageId, toJid: `${to}@s.whatsapp.net` };
|
||||
}
|
||||
// Fall back to direct send
|
||||
return sendMessageWhatsApp(to, message, opts);
|
||||
}
|
||||
|
||||
@@ -1027,47 +1016,7 @@ export async function monitorWebProvider(
|
||||
`WhatsApp gateway connected${selfE164 ? ` as ${selfE164}` : ""}.`,
|
||||
);
|
||||
|
||||
// Start IPC server so `clawdis send` can use this connection
|
||||
// instead of creating a new one (which would corrupt Signal session)
|
||||
if ("sendMessage" in listener && "sendComposingTo" in listener) {
|
||||
startIpcServer(async (to, message, mediaUrl) => {
|
||||
let mediaBuffer: Buffer | undefined;
|
||||
let mediaType: string | undefined;
|
||||
if (mediaUrl) {
|
||||
const media = await loadWebMedia(mediaUrl);
|
||||
mediaBuffer = media.buffer;
|
||||
mediaType = media.contentType;
|
||||
}
|
||||
const result = await listener.sendMessage(
|
||||
to,
|
||||
message,
|
||||
mediaBuffer,
|
||||
mediaType,
|
||||
);
|
||||
// Add to echo detection so we don't process our own message
|
||||
if (message) {
|
||||
recentlySent.add(message);
|
||||
if (recentlySent.size > MAX_RECENT_MESSAGES) {
|
||||
const firstKey = recentlySent.values().next().value;
|
||||
if (firstKey) recentlySent.delete(firstKey);
|
||||
}
|
||||
}
|
||||
logInfo(
|
||||
`📤 IPC send to ${to}: ${message.substring(0, 50)}...`,
|
||||
runtime,
|
||||
);
|
||||
// Show typing indicator after send so user knows more may be coming
|
||||
try {
|
||||
await listener.sendComposingTo(to);
|
||||
} catch {
|
||||
// Ignore typing indicator errors - not critical
|
||||
}
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
const closeListener = async () => {
|
||||
stopIpcServer();
|
||||
if (heartbeat) clearInterval(heartbeat);
|
||||
if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer);
|
||||
if (watchdogTimer) clearInterval(watchdogTimer);
|
||||
|
||||
@@ -1,65 +0,0 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
vi.mock("../logging.js", () => ({
|
||||
getChildLogger: () => ({
|
||||
info: vi.fn(),
|
||||
error: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
}),
|
||||
}));
|
||||
|
||||
const originalHome = process.env.HOME;
|
||||
|
||||
afterEach(() => {
|
||||
process.env.HOME = originalHome;
|
||||
vi.resetModules();
|
||||
});
|
||||
|
||||
describe("ipc hardening", () => {
|
||||
it("creates private socket dir and socket with tight perms", async () => {
|
||||
const tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), "clawdis-home-"));
|
||||
const clawdisDir = path.join(tmpHome, ".clawdis");
|
||||
fs.mkdirSync(clawdisDir, { recursive: true });
|
||||
process.env.HOME = tmpHome;
|
||||
vi.resetModules();
|
||||
|
||||
const ipc = await import("./ipc.js");
|
||||
|
||||
const sendHandler = vi.fn().mockResolvedValue({ messageId: "msg1" });
|
||||
ipc.startIpcServer(sendHandler);
|
||||
|
||||
const dirStat = fs.lstatSync(path.join(tmpHome, ".clawdis", "ipc"));
|
||||
expect(dirStat.mode & 0o777).toBe(0o700);
|
||||
|
||||
expect(ipc.isRelayRunning()).toBe(true);
|
||||
|
||||
const socketStat = fs.lstatSync(ipc.getSocketPath());
|
||||
expect(socketStat.isSocket()).toBe(true);
|
||||
if (typeof process.getuid === "function") {
|
||||
expect(socketStat.uid).toBe(process.getuid());
|
||||
}
|
||||
|
||||
ipc.stopIpcServer();
|
||||
expect(ipc.isRelayRunning()).toBe(false);
|
||||
});
|
||||
|
||||
it("refuses to start when IPC dir is a symlink", async () => {
|
||||
const tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), "clawdis-home-"));
|
||||
const clawdisDir = path.join(tmpHome, ".clawdis");
|
||||
fs.mkdirSync(clawdisDir, { recursive: true });
|
||||
fs.symlinkSync("/tmp", path.join(clawdisDir, "ipc"));
|
||||
|
||||
process.env.HOME = tmpHome;
|
||||
vi.resetModules();
|
||||
|
||||
const ipc = await import("./ipc.js");
|
||||
const sendHandler = vi.fn().mockResolvedValue({ messageId: "msg1" });
|
||||
|
||||
expect(() => ipc.startIpcServer(sendHandler)).toThrow(/symlink/i);
|
||||
});
|
||||
});
|
||||
277
src/web/ipc.ts
277
src/web/ipc.ts
@@ -1,277 +0,0 @@
|
||||
/**
|
||||
* IPC server for clawdis gateway.
|
||||
*
|
||||
* When the gateway is running, it starts a Unix socket server that allows
|
||||
* `clawdis send` and `clawdis heartbeat` to send messages through the
|
||||
* existing WhatsApp connection instead of creating new ones.
|
||||
*
|
||||
* This prevents Signal session ratchet corruption from multiple connections.
|
||||
*/
|
||||
|
||||
import fs from "node:fs";
|
||||
import net from "node:net";
|
||||
import path from "node:path";
|
||||
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { CONFIG_DIR } from "../utils.js";
|
||||
|
||||
const SOCKET_DIR = path.join(CONFIG_DIR, "ipc");
|
||||
const SOCKET_PATH = path.join(SOCKET_DIR, "gateway.sock");
|
||||
|
||||
export interface IpcSendRequest {
|
||||
type: "send";
|
||||
to: string;
|
||||
message: string;
|
||||
mediaUrl?: string;
|
||||
}
|
||||
|
||||
export interface IpcSendResponse {
|
||||
success: boolean;
|
||||
messageId?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
type SendHandler = (
|
||||
to: string,
|
||||
message: string,
|
||||
mediaUrl?: string,
|
||||
) => Promise<{ messageId: string }>;
|
||||
|
||||
let server: net.Server | null = null;
|
||||
|
||||
/**
|
||||
* Start the IPC server. Called by the gateway when it starts.
|
||||
*/
|
||||
export function startIpcServer(sendHandler: SendHandler): void {
|
||||
const logger = getChildLogger({ module: "ipc-server" });
|
||||
|
||||
ensureSocketDir();
|
||||
try {
|
||||
assertSafeSocketPath(SOCKET_PATH);
|
||||
} catch (err) {
|
||||
logger.error({ error: String(err) }, "Refusing to start IPC server");
|
||||
throw err;
|
||||
}
|
||||
|
||||
// Clean up stale socket file (only if safe to do so)
|
||||
try {
|
||||
fs.unlinkSync(SOCKET_PATH);
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code !== "ENOENT") {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
server = net.createServer((conn) => {
|
||||
let buffer = "";
|
||||
|
||||
conn.on("data", async (data) => {
|
||||
buffer += data.toString();
|
||||
|
||||
// Try to parse complete JSON messages (newline-delimited)
|
||||
const lines = buffer.split("\n");
|
||||
buffer = lines.pop() ?? ""; // Keep incomplete line in buffer
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
|
||||
try {
|
||||
const request = JSON.parse(line) as IpcSendRequest;
|
||||
|
||||
if (request.type === "send") {
|
||||
try {
|
||||
const result = await sendHandler(
|
||||
request.to,
|
||||
request.message,
|
||||
request.mediaUrl,
|
||||
);
|
||||
const response: IpcSendResponse = {
|
||||
success: true,
|
||||
messageId: result.messageId,
|
||||
};
|
||||
conn.write(`${JSON.stringify(response)}\n`);
|
||||
} catch (err) {
|
||||
const response: IpcSendResponse = {
|
||||
success: false,
|
||||
error: String(err),
|
||||
};
|
||||
conn.write(`${JSON.stringify(response)}\n`);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn({ error: String(err) }, "failed to parse IPC request");
|
||||
const response: IpcSendResponse = {
|
||||
success: false,
|
||||
error: "Invalid request format",
|
||||
};
|
||||
conn.write(`${JSON.stringify(response)}\n`);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
conn.on("error", (err) => {
|
||||
logger.debug({ error: String(err) }, "IPC connection error");
|
||||
});
|
||||
});
|
||||
|
||||
server.listen(SOCKET_PATH, () => {
|
||||
logger.info({ socketPath: SOCKET_PATH }, "IPC server started");
|
||||
// Make socket accessible
|
||||
fs.chmodSync(SOCKET_PATH, 0o600);
|
||||
});
|
||||
|
||||
server.on("error", (err) => {
|
||||
logger.error({ error: String(err) }, "IPC server error");
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the IPC server. Called when gateway shuts down.
|
||||
*/
|
||||
export function stopIpcServer(): void {
|
||||
if (server) {
|
||||
server.close();
|
||||
server = null;
|
||||
}
|
||||
try {
|
||||
fs.unlinkSync(SOCKET_PATH);
|
||||
} catch {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the gateway IPC server is running.
|
||||
*/
|
||||
export function isRelayRunning(): boolean {
|
||||
try {
|
||||
assertSafeSocketPath(SOCKET_PATH);
|
||||
fs.accessSync(SOCKET_PATH);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message through the running gateway's IPC.
|
||||
* Returns null if gateway is not running.
|
||||
*/
|
||||
export async function sendViaIpc(
|
||||
to: string,
|
||||
message: string,
|
||||
mediaUrl?: string,
|
||||
): Promise<IpcSendResponse | null> {
|
||||
if (!isRelayRunning()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const client = net.createConnection(SOCKET_PATH);
|
||||
let buffer = "";
|
||||
let resolved = false;
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
client.destroy();
|
||||
resolve({ success: false, error: "IPC timeout" });
|
||||
}
|
||||
}, 30000); // 30 second timeout
|
||||
|
||||
client.on("connect", () => {
|
||||
const request: IpcSendRequest = {
|
||||
type: "send",
|
||||
to,
|
||||
message,
|
||||
mediaUrl,
|
||||
};
|
||||
client.write(`${JSON.stringify(request)}\n`);
|
||||
});
|
||||
|
||||
client.on("data", (data) => {
|
||||
buffer += data.toString();
|
||||
const lines = buffer.split("\n");
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
try {
|
||||
const response = JSON.parse(line) as IpcSendResponse;
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
clearTimeout(timeout);
|
||||
client.end();
|
||||
resolve(response);
|
||||
}
|
||||
return;
|
||||
} catch {
|
||||
// Keep reading
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
client.on("error", (_err) => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
clearTimeout(timeout);
|
||||
// Socket exists but can't connect - gateway might have crashed
|
||||
resolve(null);
|
||||
}
|
||||
});
|
||||
|
||||
client.on("close", () => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
clearTimeout(timeout);
|
||||
resolve({ success: false, error: "Connection closed" });
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the IPC socket path for debugging/status.
|
||||
*/
|
||||
export function getSocketPath(): string {
|
||||
return SOCKET_PATH;
|
||||
}
|
||||
|
||||
function ensureSocketDir(): void {
|
||||
try {
|
||||
const stat = fs.lstatSync(SOCKET_DIR);
|
||||
if (stat.isSymbolicLink()) {
|
||||
throw new Error(`IPC dir is a symlink: ${SOCKET_DIR}`);
|
||||
}
|
||||
if (!stat.isDirectory()) {
|
||||
throw new Error(`IPC dir is not a directory: ${SOCKET_DIR}`);
|
||||
}
|
||||
// Enforce private permissions
|
||||
fs.chmodSync(SOCKET_DIR, 0o700);
|
||||
if (typeof process.getuid === "function" && stat.uid !== process.getuid()) {
|
||||
throw new Error(`IPC dir owned by different user: ${SOCKET_DIR}`);
|
||||
}
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
fs.mkdirSync(SOCKET_DIR, { recursive: true, mode: 0o700 });
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
function assertSafeSocketPath(socketPath: string): void {
|
||||
try {
|
||||
const stat = fs.lstatSync(socketPath);
|
||||
if (stat.isSymbolicLink()) {
|
||||
throw new Error(`Refusing IPC socket symlink: ${socketPath}`);
|
||||
}
|
||||
if (typeof process.getuid === "function" && stat.uid !== process.getuid()) {
|
||||
throw new Error(`IPC socket owned by different user: ${socketPath}`);
|
||||
}
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
return; // Missing is fine; creation will happen next.
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user