From aaa310c047ff6edbbb68d7663c6b790c431029a0 Mon Sep 17 00:00:00 2001
From: Peter Steinberger
Date: Fri, 16 Jan 2026 20:33:04 +0000
Subject: [PATCH] fix: bound signal/imessage transport readiness waits
Co-authored-by: Szpadel <1857251+Szpadel@users.noreply.github.com>
---
CHANGELOG.md | 5 +-
README.md | 2 +-
...essages-without-mention-by-default.test.ts | 4 ++
...last-route-chat-id-direct-messages.test.ts | 4 ++
src/imessage/monitor/monitor-provider.ts | 25 +++++++-
src/imessage/probe.ts | 19 ++++--
src/infra/transport-ready.test.ts | 40 +++++++++++++
src/infra/transport-ready.ts | 59 +++++++++++++++++++
src/signal/monitor.ts | 40 +++++++------
9 files changed, 172 insertions(+), 26 deletions(-)
create mode 100644 src/infra/transport-ready.test.ts
create mode 100644 src/infra/transport-ready.ts
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2cb464394..8429b5aba 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -52,8 +52,9 @@
### Fixes
- WhatsApp: default response prefix only for self-chat, using identity name when set.
- - Auth: merge main auth profiles into per-agent stores for sub-agents and document inheritance. (#1013) — thanks @marcmarg.
- - Agents: avoid JSON Schema `format` collisions in tool params by renaming snapshot format fields. (#1013) — thanks @marcmarg.
+- Signal/iMessage: bound transport readiness waits to 30s with periodic logging. (#1014) — thanks @Szpadel.
+- Auth: merge main auth profiles into per-agent stores for sub-agents and document inheritance. (#1013) — thanks @marcmarg.
+- Agents: avoid JSON Schema `format` collisions in tool params by renaming snapshot format fields. (#1013) — thanks @marcmarg.
- Fix: make `clawdbot update` auto-update global installs when installed via a package manager.
- Fix: list model picker entries as provider/model pairs for explicit selection. (#970) — thanks @mcinteerj.
- Fix: align OpenAI image-gen defaults with DALL-E 3 standard quality and document output formats. (#880) — thanks @mkbehr.
diff --git a/README.md b/README.md
index b1f814ad8..7f1033c8b 100644
--- a/README.md
+++ b/README.md
@@ -493,5 +493,5 @@ Thanks to all clawtributors:
-
+
diff --git a/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts b/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts
index 7479c5b9a..5039212e1 100644
--- a/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts
+++ b/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts
@@ -54,6 +54,10 @@ vi.mock("./client.js", () => ({
}),
}));
+vi.mock("./probe.js", () => ({
+ probeIMessage: vi.fn(async () => ({ ok: true })),
+}));
+
const flush = () => new Promise((resolve) => setTimeout(resolve, 0));
async function waitForSubscribe() {
diff --git a/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts b/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts
index 49e739f1f..37bb277dd 100644
--- a/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts
+++ b/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts
@@ -54,6 +54,10 @@ vi.mock("./client.js", () => ({
}),
}));
+vi.mock("./probe.js", () => ({
+ probeIMessage: vi.fn(async () => ({ ok: true })),
+}));
+
const flush = () => new Promise((resolve) => setTimeout(resolve, 0));
async function waitForSubscribe() {
diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts
index 2918b5fc8..67e5d55e9 100644
--- a/src/imessage/monitor/monitor-provider.ts
+++ b/src/imessage/monitor/monitor-provider.ts
@@ -30,6 +30,7 @@ import {
} from "../../config/group-policy.js";
import { resolveStorePath, updateLastRoute } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
+import { waitForTransportReady } from "../../infra/transport-ready.js";
import { mediaKindFromMime } from "../../media/constants.js";
import { buildPairingReply } from "../../pairing/pairing-messages.js";
import {
@@ -40,6 +41,7 @@ import { resolveAgentRoute } from "../../routing/resolve-route.js";
import { truncateUtf16Safe } from "../../utils.js";
import { resolveIMessageAccount } from "../accounts.js";
import { createIMessageRpcClient } from "../client.js";
+import { probeIMessage } from "../probe.js";
import { sendMessageIMessage } from "../send.js";
import {
formatIMessageChatTarget,
@@ -76,6 +78,8 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
const dmPolicy = imessageCfg.dmPolicy ?? "pairing";
const includeAttachments = opts.includeAttachments ?? imessageCfg.includeAttachments ?? false;
const mediaMaxBytes = (opts.mediaMaxMb ?? imessageCfg.mediaMaxMb ?? 16) * 1024 * 1024;
+ const cliPath = opts.cliPath ?? imessageCfg.cliPath ?? "imsg";
+ const dbPath = opts.dbPath ?? imessageCfg.dbPath;
const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "imessage" });
const inboundDebouncer = createInboundDebouncer<{ message: IMessagePayload }>({
@@ -453,9 +457,26 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
await inboundDebouncer.enqueue({ message });
};
+ await waitForTransportReady({
+ label: "imsg rpc",
+ timeoutMs: 30_000,
+ logAfterMs: 10_000,
+ logIntervalMs: 10_000,
+ pollIntervalMs: 500,
+ abortSignal: opts.abortSignal,
+ runtime,
+ check: async () => {
+ const probe = await probeIMessage(2000, { cliPath, dbPath, runtime });
+ if (probe.ok) return { ok: true };
+ return { ok: false, error: probe.error ?? "unreachable" };
+ },
+ });
+
+ if (opts.abortSignal?.aborted) return;
+
const client = await createIMessageRpcClient({
- cliPath: opts.cliPath ?? imessageCfg.cliPath,
- dbPath: opts.dbPath ?? imessageCfg.dbPath,
+ cliPath,
+ dbPath,
runtime,
onNotification: (msg) => {
if (msg.method === "message") {
diff --git a/src/imessage/probe.ts b/src/imessage/probe.ts
index 13ac52f69..d59f20512 100644
--- a/src/imessage/probe.ts
+++ b/src/imessage/probe.ts
@@ -1,5 +1,6 @@
import { detectBinary } from "../commands/onboard-helpers.js";
import { loadConfig } from "../config/config.js";
+import type { RuntimeEnv } from "../runtime.js";
import { createIMessageRpcClient } from "./client.js";
export type IMessageProbe = {
@@ -7,10 +8,19 @@ export type IMessageProbe = {
error?: string | null;
};
-export async function probeIMessage(timeoutMs = 2000): Promise {
- const cfg = loadConfig();
- const cliPath = cfg.channels?.imessage?.cliPath?.trim() || "imsg";
- const dbPath = cfg.channels?.imessage?.dbPath?.trim();
+export type IMessageProbeOptions = {
+ cliPath?: string;
+ dbPath?: string;
+ runtime?: RuntimeEnv;
+};
+
+export async function probeIMessage(
+ timeoutMs = 2000,
+ opts: IMessageProbeOptions = {},
+): Promise {
+ const cfg = opts.cliPath || opts.dbPath ? undefined : loadConfig();
+ const cliPath = opts.cliPath?.trim() || cfg?.channels?.imessage?.cliPath?.trim() || "imsg";
+ const dbPath = opts.dbPath?.trim() || cfg?.channels?.imessage?.dbPath?.trim();
const detected = await detectBinary(cliPath);
if (!detected) {
return { ok: false, error: `imsg not found (${cliPath})` };
@@ -19,6 +29,7 @@ export async function probeIMessage(timeoutMs = 2000): Promise {
const client = await createIMessageRpcClient({
cliPath,
dbPath,
+ runtime: opts.runtime,
});
try {
await client.request("chats.list", { limit: 1 }, { timeoutMs });
diff --git a/src/infra/transport-ready.test.ts b/src/infra/transport-ready.test.ts
new file mode 100644
index 000000000..7a072c53a
--- /dev/null
+++ b/src/infra/transport-ready.test.ts
@@ -0,0 +1,40 @@
+import { describe, expect, it, vi } from "vitest";
+
+import { waitForTransportReady } from "./transport-ready.js";
+
+describe("waitForTransportReady", () => {
+ it("returns when the check succeeds and logs after the delay", async () => {
+ const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() };
+ let attempts = 0;
+ await waitForTransportReady({
+ label: "test transport",
+ timeoutMs: 500,
+ logAfterMs: 100,
+ logIntervalMs: 100,
+ pollIntervalMs: 50,
+ runtime,
+ check: async () => {
+ attempts += 1;
+ if (attempts > 3) return { ok: true };
+ return { ok: false, error: "not ready" };
+ },
+ });
+ expect(runtime.error).toHaveBeenCalled();
+ });
+
+ it("throws after the timeout", async () => {
+ const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() };
+ await expect(
+ waitForTransportReady({
+ label: "test transport",
+ timeoutMs: 200,
+ logAfterMs: 0,
+ logIntervalMs: 100,
+ pollIntervalMs: 50,
+ runtime,
+ check: async () => ({ ok: false, error: "still down" }),
+ }),
+ ).rejects.toThrow("test transport not ready");
+ expect(runtime.error).toHaveBeenCalled();
+ });
+});
diff --git a/src/infra/transport-ready.ts b/src/infra/transport-ready.ts
new file mode 100644
index 000000000..ec514d2e5
--- /dev/null
+++ b/src/infra/transport-ready.ts
@@ -0,0 +1,59 @@
+import { danger } from "../globals.js";
+import type { RuntimeEnv } from "../runtime.js";
+import { sleepWithAbort } from "./backoff.js";
+
+export type TransportReadyResult = {
+ ok: boolean;
+ error?: string | null;
+};
+
+export type WaitForTransportReadyParams = {
+ label: string;
+ timeoutMs: number;
+ logAfterMs?: number;
+ logIntervalMs?: number;
+ pollIntervalMs?: number;
+ abortSignal?: AbortSignal;
+ runtime: RuntimeEnv;
+ check: () => Promise;
+};
+
+export async function waitForTransportReady(params: WaitForTransportReadyParams): Promise {
+ const started = Date.now();
+ const timeoutMs = Math.max(0, params.timeoutMs);
+ const deadline = started + timeoutMs;
+ const logAfterMs = Math.max(0, params.logAfterMs ?? timeoutMs);
+ const logIntervalMs = Math.max(1_000, params.logIntervalMs ?? 30_000);
+ const pollIntervalMs = Math.max(50, params.pollIntervalMs ?? 150);
+ let nextLogAt = started + logAfterMs;
+ let lastError: string | null = null;
+
+ while (true) {
+ if (params.abortSignal?.aborted) return;
+ const res = await params.check();
+ if (res.ok) return;
+ lastError = res.error ?? null;
+
+ const now = Date.now();
+ if (now >= deadline) break;
+ if (now >= nextLogAt) {
+ const elapsedMs = now - started;
+ params.runtime.error?.(
+ danger(`${params.label} not ready after ${elapsedMs}ms (${lastError ?? "unknown error"})`),
+ );
+ nextLogAt = now + logIntervalMs;
+ }
+
+ try {
+ await sleepWithAbort(pollIntervalMs, params.abortSignal);
+ } catch (err) {
+ if (params.abortSignal?.aborted) return;
+ throw err;
+ }
+ }
+
+ params.runtime.error?.(
+ danger(`${params.label} not ready after ${timeoutMs}ms (${lastError ?? "unknown error"})`),
+ );
+ throw new Error(`${params.label} not ready (${lastError ?? "unknown error"})`);
+}
diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts
index 10e748674..98b950e9b 100644
--- a/src/signal/monitor.ts
+++ b/src/signal/monitor.ts
@@ -4,10 +4,10 @@ import type { ReplyPayload } from "../auto-reply/types.js";
import type { ClawdbotConfig } from "../config/config.js";
import { loadConfig } from "../config/config.js";
import type { SignalReactionNotificationMode } from "../config/types.js";
-import { danger } from "../globals.js";
import { saveMediaBuffer } from "../media/store.js";
import type { RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
+import { waitForTransportReady } from "../infra/transport-ready.js";
import { resolveSignalAccount } from "./accounts.js";
import { signalCheck, signalRpcRequest } from "./client.js";
import { spawnSignalDaemon } from "./daemon.js";
@@ -145,23 +145,27 @@ async function waitForSignalDaemonReady(params: {
baseUrl: string;
abortSignal?: AbortSignal;
timeoutMs: number;
+ logAfterMs: number;
+ logIntervalMs?: number;
runtime: RuntimeEnv;
}): Promise {
- const started = Date.now();
- let lastError: string | null = null;
-
- while (Date.now() - started < params.timeoutMs) {
- if (params.abortSignal?.aborted) return;
- const res = await signalCheck(params.baseUrl, 1000);
- if (res.ok) return;
- lastError = res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable");
- await new Promise((r) => setTimeout(r, 150));
- }
-
- params.runtime.error?.(
- danger(`daemon not ready after ${params.timeoutMs}ms (${lastError ?? "unknown error"})`),
- );
- throw new Error(`signal daemon not ready (${lastError ?? "unknown error"})`);
+ await waitForTransportReady({
+ label: "signal daemon",
+ timeoutMs: params.timeoutMs,
+ logAfterMs: params.logAfterMs,
+ logIntervalMs: params.logIntervalMs,
+ pollIntervalMs: 150,
+ abortSignal: params.abortSignal,
+ runtime: params.runtime,
+ check: async () => {
+ const res = await signalCheck(params.baseUrl, 1000);
+ if (res.ok) return { ok: true };
+ return {
+ ok: false,
+ error: res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable"),
+ };
+ },
+ });
}
async function fetchAttachment(params: {
@@ -305,7 +309,9 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
await waitForSignalDaemonReady({
baseUrl,
abortSignal: opts.abortSignal,
- timeoutMs: 10_000,
+ timeoutMs: 30_000,
+ logAfterMs: 10_000,
+ logIntervalMs: 10_000,
runtime,
});
}