Files
clawdbot/src/signal/monitor.ts
Peter Steinberger c379191f80 chore: migrate to oxlint and oxfmt
Co-authored-by: Christoph Nakazawa <christoph.pojer@gmail.com>
2026-01-14 15:02:19 +00:00

358 lines
12 KiB
TypeScript

import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "../auto-reply/reply/history.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import type { ClawdbotConfig } from "../config/config.js";
import { loadConfig } from "../config/config.js";
import type { SignalReactionNotificationMode } from "../config/types.js";
import { danger } from "../globals.js";
import { saveMediaBuffer } from "../media/store.js";
import type { RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { resolveSignalAccount } from "./accounts.js";
import { signalCheck, signalRpcRequest } from "./client.js";
import { spawnSignalDaemon } from "./daemon.js";
import { isSignalSenderAllowed, type resolveSignalSender } from "./identity.js";
import { createSignalEventHandler } from "./monitor/event-handler.js";
import { sendMessageSignal } from "./send.js";
import { runSignalSseLoop } from "./sse-reconnect.js";
type SignalReactionMessage = {
emoji?: string | null;
targetAuthor?: string | null;
targetAuthorUuid?: string | null;
targetSentTimestamp?: number | null;
isRemove?: boolean | null;
groupInfo?: {
groupId?: string | null;
groupName?: string | null;
} | null;
};
type SignalAttachment = {
id?: string | null;
contentType?: string | null;
filename?: string | null;
size?: number | null;
};
export type MonitorSignalOpts = {
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
account?: string;
accountId?: string;
config?: ClawdbotConfig;
baseUrl?: string;
autoStart?: boolean;
cliPath?: string;
httpHost?: string;
httpPort?: number;
receiveMode?: "on-start" | "manual";
ignoreAttachments?: boolean;
ignoreStories?: boolean;
sendReadReceipts?: boolean;
allowFrom?: Array<string | number>;
groupAllowFrom?: Array<string | number>;
mediaMaxMb?: number;
};
function resolveRuntime(opts: MonitorSignalOpts): RuntimeEnv {
return (
opts.runtime ?? {
log: console.log,
error: console.error,
exit: (code: number): never => {
throw new Error(`exit ${code}`);
},
}
);
}
function normalizeAllowList(raw?: Array<string | number>): string[] {
return (raw ?? []).map((entry) => String(entry).trim()).filter(Boolean);
}
type SignalReactionTarget = {
kind: "phone" | "uuid";
id: string;
display: string;
};
function resolveSignalReactionTargets(reaction: SignalReactionMessage): SignalReactionTarget[] {
const targets: SignalReactionTarget[] = [];
const uuid = reaction.targetAuthorUuid?.trim();
if (uuid) {
targets.push({ kind: "uuid", id: uuid, display: `uuid:${uuid}` });
}
const author = reaction.targetAuthor?.trim();
if (author) {
const normalized = normalizeE164(author);
targets.push({ kind: "phone", id: normalized, display: normalized });
}
return targets;
}
function isSignalReactionMessage(
reaction: SignalReactionMessage | null | undefined,
): reaction is SignalReactionMessage {
if (!reaction) return false;
const emoji = reaction.emoji?.trim();
const timestamp = reaction.targetSentTimestamp;
const hasTarget = Boolean(reaction.targetAuthor?.trim() || reaction.targetAuthorUuid?.trim());
return Boolean(emoji && typeof timestamp === "number" && timestamp > 0 && hasTarget);
}
function shouldEmitSignalReactionNotification(params: {
mode?: SignalReactionNotificationMode;
account?: string | null;
targets?: SignalReactionTarget[];
sender?: ReturnType<typeof resolveSignalSender> | null;
allowlist?: string[];
}) {
const { mode, account, targets, sender, allowlist } = params;
const effectiveMode = mode ?? "own";
if (effectiveMode === "off") return false;
if (effectiveMode === "own") {
const accountId = account?.trim();
if (!accountId || !targets || targets.length === 0) return false;
const normalizedAccount = normalizeE164(accountId);
return targets.some((target) => {
if (target.kind === "uuid") {
return accountId === target.id || accountId === `uuid:${target.id}`;
}
return normalizedAccount === target.id;
});
}
if (effectiveMode === "allowlist") {
if (!sender || !allowlist || allowlist.length === 0) return false;
return isSignalSenderAllowed(sender, allowlist);
}
return true;
}
function buildSignalReactionSystemEventText(params: {
emojiLabel: string;
actorLabel: string;
messageId: string;
targetLabel?: string;
groupLabel?: string;
}) {
const base = `Signal reaction added: ${params.emojiLabel} by ${params.actorLabel} msg ${params.messageId}`;
const withTarget = params.targetLabel ? `${base} from ${params.targetLabel}` : base;
return params.groupLabel ? `${withTarget} in ${params.groupLabel}` : withTarget;
}
async function waitForSignalDaemonReady(params: {
baseUrl: string;
abortSignal?: AbortSignal;
timeoutMs: number;
runtime: RuntimeEnv;
}): Promise<void> {
const started = Date.now();
let lastError: string | null = null;
while (Date.now() - started < params.timeoutMs) {
if (params.abortSignal?.aborted) return;
const res = await signalCheck(params.baseUrl, 1000);
if (res.ok) return;
lastError = res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable");
await new Promise((r) => setTimeout(r, 150));
}
params.runtime.error?.(
danger(`daemon not ready after ${params.timeoutMs}ms (${lastError ?? "unknown error"})`),
);
throw new Error(`signal daemon not ready (${lastError ?? "unknown error"})`);
}
async function fetchAttachment(params: {
baseUrl: string;
account?: string;
attachment: SignalAttachment;
sender?: string;
groupId?: string;
maxBytes: number;
}): Promise<{ path: string; contentType?: string } | null> {
const { attachment } = params;
if (!attachment?.id) return null;
if (attachment.size && attachment.size > params.maxBytes) {
throw new Error(
`Signal attachment ${attachment.id} exceeds ${(params.maxBytes / (1024 * 1024)).toFixed(0)}MB limit`,
);
}
const rpcParams: Record<string, unknown> = {
id: attachment.id,
};
if (params.account) rpcParams.account = params.account;
if (params.groupId) rpcParams.groupId = params.groupId;
else if (params.sender) rpcParams.recipient = params.sender;
else return null;
const result = await signalRpcRequest<{ data?: string }>("getAttachment", rpcParams, {
baseUrl: params.baseUrl,
});
if (!result?.data) return null;
const buffer = Buffer.from(result.data, "base64");
const saved = await saveMediaBuffer(
buffer,
attachment.contentType ?? undefined,
"inbound",
params.maxBytes,
);
return { path: saved.path, contentType: saved.contentType };
}
async function deliverReplies(params: {
replies: ReplyPayload[];
target: string;
baseUrl: string;
account?: string;
accountId?: string;
runtime: RuntimeEnv;
maxBytes: number;
textLimit: number;
}) {
const { replies, target, baseUrl, account, accountId, runtime, maxBytes, textLimit } = params;
for (const payload of replies) {
const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
const text = payload.text ?? "";
if (!text && mediaList.length === 0) continue;
if (mediaList.length === 0) {
for (const chunk of chunkText(text, textLimit)) {
await sendMessageSignal(target, chunk, {
baseUrl,
account,
maxBytes,
accountId,
});
}
} else {
let first = true;
for (const url of mediaList) {
const caption = first ? text : "";
first = false;
await sendMessageSignal(target, caption, {
baseUrl,
account,
mediaUrl: url,
maxBytes,
accountId,
});
}
}
runtime.log?.(`delivered reply to ${target}`);
}
}
export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promise<void> {
const runtime = resolveRuntime(opts);
const cfg = opts.config ?? loadConfig();
const accountInfo = resolveSignalAccount({
cfg,
accountId: opts.accountId,
});
const historyLimit = Math.max(
0,
accountInfo.config.historyLimit ??
cfg.messages?.groupChat?.historyLimit ??
DEFAULT_GROUP_HISTORY_LIMIT,
);
const groupHistories = new Map<string, HistoryEntry[]>();
const textLimit = resolveTextChunkLimit(cfg, "signal", accountInfo.accountId);
const baseUrl = opts.baseUrl?.trim() || accountInfo.baseUrl;
const account = opts.account?.trim() || accountInfo.config.account?.trim();
const dmPolicy = accountInfo.config.dmPolicy ?? "pairing";
const allowFrom = normalizeAllowList(opts.allowFrom ?? accountInfo.config.allowFrom);
const groupAllowFrom = normalizeAllowList(
opts.groupAllowFrom ??
accountInfo.config.groupAllowFrom ??
(accountInfo.config.allowFrom && accountInfo.config.allowFrom.length > 0
? accountInfo.config.allowFrom
: []),
);
const groupPolicy = accountInfo.config.groupPolicy ?? "allowlist";
const reactionMode = accountInfo.config.reactionNotifications ?? "own";
const reactionAllowlist = normalizeAllowList(accountInfo.config.reactionAllowlist);
const mediaMaxBytes = (opts.mediaMaxMb ?? accountInfo.config.mediaMaxMb ?? 8) * 1024 * 1024;
const ignoreAttachments = opts.ignoreAttachments ?? accountInfo.config.ignoreAttachments ?? false;
const autoStart = opts.autoStart ?? accountInfo.config.autoStart ?? !accountInfo.config.httpUrl;
let daemonHandle: ReturnType<typeof spawnSignalDaemon> | null = null;
if (autoStart) {
const cliPath = opts.cliPath ?? accountInfo.config.cliPath ?? "signal-cli";
const httpHost = opts.httpHost ?? accountInfo.config.httpHost ?? "127.0.0.1";
const httpPort = opts.httpPort ?? accountInfo.config.httpPort ?? 8080;
daemonHandle = spawnSignalDaemon({
cliPath,
account,
httpHost,
httpPort,
receiveMode: opts.receiveMode ?? accountInfo.config.receiveMode,
ignoreAttachments: opts.ignoreAttachments ?? accountInfo.config.ignoreAttachments,
ignoreStories: opts.ignoreStories ?? accountInfo.config.ignoreStories,
sendReadReceipts: opts.sendReadReceipts ?? accountInfo.config.sendReadReceipts,
runtime,
});
}
const onAbort = () => {
daemonHandle?.stop();
};
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
try {
if (daemonHandle) {
await waitForSignalDaemonReady({
baseUrl,
abortSignal: opts.abortSignal,
timeoutMs: 10_000,
runtime,
});
}
const handleEvent = createSignalEventHandler({
runtime,
cfg,
baseUrl,
account,
accountId: accountInfo.accountId,
blockStreaming: accountInfo.config.blockStreaming,
historyLimit,
groupHistories,
textLimit,
dmPolicy,
allowFrom,
groupAllowFrom,
groupPolicy,
reactionMode,
reactionAllowlist,
mediaMaxBytes,
ignoreAttachments,
fetchAttachment,
deliverReplies,
resolveSignalReactionTargets,
isSignalReactionMessage,
shouldEmitSignalReactionNotification,
buildSignalReactionSystemEventText,
});
await runSignalSseLoop({
baseUrl,
account,
abortSignal: opts.abortSignal,
runtime,
onEvent: (event) => {
void handleEvent(event).catch((err) => {
runtime.error?.(`event handler failed: ${String(err)}`);
});
},
});
} catch (err) {
if (opts.abortSignal?.aborted) return;
throw err;
} finally {
opts.abortSignal?.removeEventListener("abort", onAbort);
daemonHandle?.stop();
}
}