feat(signal): add typing + read receipts

This commit is contained in:
Peter Steinberger
2026-01-22 02:04:51 +00:00
parent 5bf7a9d0db
commit cc74e0d188
11 changed files with 261 additions and 12 deletions

View File

@@ -7,6 +7,7 @@ Docs: https://docs.clawd.bot
### Changes
- Docs: add troubleshooting entry for gateway.mode blocking gateway start. https://docs.clawd.bot/gateway/troubleshooting
- Onboarding: remove the run setup-token auth option (paste setup-token or reuse CLI creds instead).
- Signal: add typing indicators and DM read receipts via signal-cli.
### Fixes
- Doctor: warn when gateway.mode is unset with configure/config guidance.

View File

@@ -100,6 +100,11 @@ Groups:
- Use `channels.signal.ignoreAttachments` to skip downloading media.
- Group history context uses `channels.signal.historyLimit` (or `channels.signal.accounts.*.historyLimit`), falling back to `messages.groupChat.historyLimit`. Set `0` to disable (default 50).
## Typing + read receipts
- **Typing indicators**: Clawdbot sends typing signals via `signal-cli sendTyping` and refreshes them while a reply is running.
- **Read receipts**: when `channels.signal.sendReadReceipts` is true, Clawdbot forwards read receipts for allowed DMs.
- Signal-cli does not expose read receipts for groups.
## Delivery targets (CLI/cron)
- DMs: `signal:+15551234567` (or plain E.164).
- Groups: `signal:group:<groupId>`.

View File

@@ -57,6 +57,8 @@ describe("signal event handler sender prefix", () => {
reactionAllowlist: [],
mediaMaxBytes: 1000,
ignoreAttachments: true,
sendReadReceipts: false,
readReceiptsViaDaemon: false,
fetchAttachment: async () => null,
deliverReplies: async () => undefined,
resolveSignalReactionTargets: () => [],

View File

@@ -0,0 +1,86 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const sendTypingMock = vi.fn();
const sendReadReceiptMock = vi.fn();
vi.mock("./send.js", () => ({
sendMessageSignal: vi.fn(),
sendTypingSignal: (...args: unknown[]) => sendTypingMock(...args),
sendReadReceiptSignal: (...args: unknown[]) => sendReadReceiptMock(...args),
}));
vi.mock("../auto-reply/reply/dispatch-from-config.js", () => ({
dispatchReplyFromConfig: vi.fn(
async (params: { replyOptions?: { onReplyStart?: () => void } }) => {
await params.replyOptions?.onReplyStart?.();
return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } };
},
),
}));
vi.mock("../pairing/pairing-store.js", () => ({
readChannelAllowFromStore: vi.fn().mockResolvedValue([]),
upsertChannelPairingRequest: vi.fn(),
}));
describe("signal event handler typing + read receipts", () => {
beforeEach(() => {
sendTypingMock.mockReset().mockResolvedValue(true);
sendReadReceiptMock.mockReset().mockResolvedValue(true);
});
it("sends typing + read receipt for allowed DMs", async () => {
const { createSignalEventHandler } = await import("./monitor/event-handler.js");
const handler = createSignalEventHandler({
runtime: { log: () => {}, error: () => {} } as any,
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
} as any,
baseUrl: "http://localhost",
account: "+15550009999",
accountId: "default",
blockStreaming: false,
historyLimit: 0,
groupHistories: new Map(),
textLimit: 4000,
dmPolicy: "open",
allowFrom: ["*"],
groupAllowFrom: ["*"],
groupPolicy: "open",
reactionMode: "off",
reactionAllowlist: [],
mediaMaxBytes: 1024,
ignoreAttachments: true,
sendReadReceipts: true,
readReceiptsViaDaemon: false,
fetchAttachment: async () => null,
deliverReplies: async () => {},
resolveSignalReactionTargets: () => [],
isSignalReactionMessage: () => false as any,
shouldEmitSignalReactionNotification: () => false,
buildSignalReactionSystemEventText: () => "reaction",
});
await handler({
event: "receive",
data: JSON.stringify({
envelope: {
sourceNumber: "+15550001111",
sourceName: "Alice",
timestamp: 1700000000000,
dataMessage: {
message: "hi",
},
},
}),
});
expect(sendTypingMock).toHaveBeenCalledWith("signal:+15550001111", expect.any(Object));
expect(sendReadReceiptMock).toHaveBeenCalledWith(
"signal:+15550001111",
1700000000000,
expect.any(Object),
);
});
});

View File

@@ -25,6 +25,8 @@ vi.mock("../auto-reply/reply.js", () => ({
vi.mock("./send.js", () => ({
sendMessageSignal: (...args: unknown[]) => sendMock(...args),
sendTypingSignal: vi.fn().mockResolvedValue(true),
sendReadReceiptSignal: vi.fn().mockResolvedValue(true),
}));
vi.mock("../pairing/pairing-store.js", () => ({

View File

@@ -29,6 +29,8 @@ vi.mock("../auto-reply/reply.js", () => ({
vi.mock("./send.js", () => ({
sendMessageSignal: (...args: unknown[]) => sendMock(...args),
sendTypingSignal: vi.fn().mockResolvedValue(true),
sendReadReceiptSignal: vi.fn().mockResolvedValue(true),
}));
vi.mock("../pairing/pairing-store.js", () => ({

View File

@@ -279,8 +279,10 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
const reactionAllowlist = normalizeAllowList(accountInfo.config.reactionAllowlist);
const mediaMaxBytes = (opts.mediaMaxMb ?? accountInfo.config.mediaMaxMb ?? 8) * 1024 * 1024;
const ignoreAttachments = opts.ignoreAttachments ?? accountInfo.config.ignoreAttachments ?? false;
const sendReadReceipts = Boolean(opts.sendReadReceipts ?? accountInfo.config.sendReadReceipts);
const autoStart = opts.autoStart ?? accountInfo.config.autoStart ?? !accountInfo.config.httpUrl;
const readReceiptsViaDaemon = Boolean(autoStart && sendReadReceipts);
let daemonHandle: ReturnType<typeof spawnSignalDaemon> | null = null;
if (autoStart) {
@@ -295,7 +297,7 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
receiveMode: opts.receiveMode ?? accountInfo.config.receiveMode,
ignoreAttachments: opts.ignoreAttachments ?? accountInfo.config.ignoreAttachments,
ignoreStories: opts.ignoreStories ?? accountInfo.config.ignoreStories,
sendReadReceipts: opts.sendReadReceipts ?? accountInfo.config.sendReadReceipts,
sendReadReceipts,
runtime,
});
}
@@ -335,6 +337,8 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
reactionAllowlist,
mediaMaxBytes,
ignoreAttachments,
sendReadReceipts,
readReceiptsViaDaemon,
fetchAttachment,
deliverReplies,
resolveSignalReactionTargets,

View File

@@ -34,6 +34,8 @@ describe("signal createSignalEventHandler inbound contract", () => {
reactionAllowlist: [],
mediaMaxBytes: 1024,
ignoreAttachments: true,
sendReadReceipts: false,
readReceiptsViaDaemon: false,
fetchAttachment: async () => null,
deliverReplies: async () => {},
resolveSignalReactionTargets: () => [],

View File

@@ -23,7 +23,7 @@ import {
clearHistoryEntries,
} from "../../auto-reply/reply/history.js";
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import {
readSessionUpdatedAt,
recordSessionMetaFromInbound,
@@ -50,7 +50,7 @@ import {
resolveSignalRecipient,
resolveSignalSender,
} from "../identity.js";
import { sendMessageSignal } from "../send.js";
import { sendMessageSignal, sendReadReceiptSignal, sendTypingSignal } from "../send.js";
import type { SignalEventHandlerDeps, SignalReceivePayload } from "./event-handler.types.js";
@@ -190,7 +190,20 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
identityName: resolveIdentityName(deps.cfg, route.agentId),
};
const dispatcher = createReplyDispatcher({
const onReplyStart = async () => {
try {
if (!ctxPayload.To) return;
await sendTypingSignal(ctxPayload.To, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
} catch (err) {
logVerbose(`signal typing cue failed for ${ctxPayload.To}: ${String(err)}`);
}
};
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
responsePrefix: resolveEffectiveMessagesConfig(deps.cfg, route.agentId).responsePrefix,
responsePrefixContextProvider: () => prefixContext,
humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId),
@@ -209,6 +222,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
onError: (err, info) => {
deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart,
});
const { queuedFinal } = await dispatchReplyFromConfig({
@@ -216,6 +230,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
cfg: deps.cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined,
onModelSelected: (ctx) => {
@@ -227,6 +242,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
},
},
});
markDispatchIdle();
if (!queuedFinal) {
if (entry.isGroup && historyKey && deps.historyLimit > 0) {
clearHistoryEntries({ historyMap: deps.groupHistories, historyKey });
@@ -479,6 +495,31 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const bodyText = messageText || placeholder || dataMessage.quote?.text?.trim() || "";
if (!bodyText) return;
const receiptTimestamp =
typeof envelope.timestamp === "number"
? envelope.timestamp
: typeof dataMessage.timestamp === "number"
? dataMessage.timestamp
: undefined;
if (deps.sendReadReceipts && !deps.readReceiptsViaDaemon && !isGroup && receiptTimestamp) {
try {
await sendReadReceiptSignal(`signal:${senderRecipient}`, receiptTimestamp, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
} catch (err) {
logVerbose(`signal read receipt failed for ${senderDisplay}: ${String(err)}`);
}
} else if (
deps.sendReadReceipts &&
!deps.readReceiptsViaDaemon &&
!isGroup &&
!receiptTimestamp
) {
logVerbose(`signal read receipt skipped (missing timestamp) for ${senderDisplay}`);
}
const senderName = envelope.sourceName ?? senderDisplay;
const messageId =
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined;

View File

@@ -76,6 +76,8 @@ export type SignalEventHandlerDeps = {
reactionAllowlist: string[];
mediaMaxBytes: number;
ignoreAttachments: boolean;
sendReadReceipts: boolean;
readReceiptsViaDaemon: boolean;
fetchAttachment: (params: {
baseUrl: string;
account?: string;

View File

@@ -22,6 +22,10 @@ export type SignalSendResult = {
timestamp?: number;
};
export type SignalRpcOpts = Pick<SignalSendOpts, "baseUrl" | "account" | "accountId" | "timeoutMs">;
export type SignalReceiptType = "read" | "viewed";
type SignalTarget =
| { type: "recipient"; recipient: string }
| { type: "group"; groupId: string }
@@ -50,6 +54,59 @@ function parseTarget(raw: string): SignalTarget {
return { type: "recipient", recipient: value };
}
type SignalTargetParams = {
recipient?: string[];
groupId?: string;
username?: string[];
};
type SignalTargetAllowlist = {
recipient?: boolean;
group?: boolean;
username?: boolean;
};
function buildTargetParams(
target: SignalTarget,
allow: SignalTargetAllowlist,
): SignalTargetParams | null {
if (target.type === "recipient") {
if (!allow.recipient) return null;
return { recipient: [target.recipient] };
}
if (target.type === "group") {
if (!allow.group) return null;
return { groupId: target.groupId };
}
if (target.type === "username") {
if (!allow.username) return null;
return { username: [target.username] };
}
return null;
}
function resolveSignalRpcContext(
opts: SignalRpcOpts,
accountInfo?: ReturnType<typeof resolveSignalAccount>,
) {
const hasBaseUrl = Boolean(opts.baseUrl?.trim());
const hasAccount = Boolean(opts.account?.trim());
const resolvedAccount =
accountInfo ||
(!hasBaseUrl || !hasAccount
? resolveSignalAccount({
cfg: loadConfig(),
accountId: opts.accountId,
})
: undefined);
const baseUrl = opts.baseUrl?.trim() || resolvedAccount?.baseUrl;
if (!baseUrl) {
throw new Error("Signal base URL is required");
}
const account = opts.account?.trim() || resolvedAccount?.config.account?.trim();
return { baseUrl, account };
}
async function resolveAttachment(
mediaUrl: string,
maxBytes: number,
@@ -74,8 +131,7 @@ export async function sendMessageSignal(
cfg,
accountId: opts.accountId,
});
const baseUrl = opts.baseUrl?.trim() || accountInfo.baseUrl;
const account = opts.account?.trim() || accountInfo.config.account?.trim();
const { baseUrl, account } = resolveSignalRpcContext(opts, accountInfo);
const target = parseTarget(to);
let message = text ?? "";
let messageFromPlaceholder = false;
@@ -129,13 +185,15 @@ export async function sendMessageSignal(
params.attachments = attachments;
}
if (target.type === "recipient") {
params.recipient = [target.recipient];
} else if (target.type === "group") {
params.groupId = target.groupId;
} else if (target.type === "username") {
params.username = [target.username];
const targetParams = buildTargetParams(target, {
recipient: true,
group: true,
username: true,
});
if (!targetParams) {
throw new Error("Signal recipient is required");
}
Object.assign(params, targetParams);
const result = await signalRpcRequest<{ timestamp?: number }>("send", params, {
baseUrl,
@@ -147,3 +205,47 @@ export async function sendMessageSignal(
timestamp,
};
}
export async function sendTypingSignal(
to: string,
opts: SignalRpcOpts & { stop?: boolean } = {},
): Promise<boolean> {
const { baseUrl, account } = resolveSignalRpcContext(opts);
const targetParams = buildTargetParams(parseTarget(to), {
recipient: true,
group: true,
});
if (!targetParams) return false;
const params: Record<string, unknown> = { ...targetParams };
if (account) params.account = account;
if (opts.stop) params.stop = true;
await signalRpcRequest("sendTyping", params, {
baseUrl,
timeoutMs: opts.timeoutMs,
});
return true;
}
export async function sendReadReceiptSignal(
to: string,
targetTimestamp: number,
opts: SignalRpcOpts & { type?: SignalReceiptType } = {},
): Promise<boolean> {
if (!Number.isFinite(targetTimestamp) || targetTimestamp <= 0) return false;
const { baseUrl, account } = resolveSignalRpcContext(opts);
const targetParams = buildTargetParams(parseTarget(to), {
recipient: true,
});
if (!targetParams) return false;
const params: Record<string, unknown> = {
...targetParams,
targetTimestamp,
type: opts.type ?? "read",
};
if (account) params.account = account;
await signalRpcRequest("sendReceipt", params, {
baseUrl,
timeoutMs: opts.timeoutMs,
});
return true;
}