diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cb464394..8429b5aba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,8 +52,9 @@ ### Fixes - WhatsApp: default response prefix only for self-chat, using identity name when set. - - Auth: merge main auth profiles into per-agent stores for sub-agents and document inheritance. (#1013) — thanks @marcmarg. - - Agents: avoid JSON Schema `format` collisions in tool params by renaming snapshot format fields. (#1013) — thanks @marcmarg. +- Signal/iMessage: bound transport readiness waits to 30s with periodic logging. (#1014) — thanks @Szpadel. +- Auth: merge main auth profiles into per-agent stores for sub-agents and document inheritance. (#1013) — thanks @marcmarg. +- Agents: avoid JSON Schema `format` collisions in tool params by renaming snapshot format fields. (#1013) — thanks @marcmarg. - Fix: make `clawdbot update` auto-update global installs when installed via a package manager. - Fix: list model picker entries as provider/model pairs for explicit selection. (#970) — thanks @mcinteerj. - Fix: align OpenAI image-gen defaults with DALL-E 3 standard quality and document output formats. (#880) — thanks @mkbehr. diff --git a/README.md b/README.md index b1f814ad8..7f1033c8b 100644 --- a/README.md +++ b/README.md @@ -493,5 +493,5 @@ Thanks to all clawtributors: MSch Mustafa Tag Eldeen ndraiman nexty5870 prathamdby reeltimeapps RLTCmpe Rolf Fredheim Rony Kelner Samrat Jha siraht snopoke The Admiral Ubuntu voidserf wstock YuriNachos Zach Knickerbocker Alphonse-arianee Azade carlulsoe ddyo Erik latitudeki5223 Manuel Maly Mourad Boustani pcty-nextgen-ios-builder Quentin Randy Torres ronak-guliani - thesash William Stock + thesash William Stock Szpadel

diff --git a/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts b/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts index 7479c5b9a..5039212e1 100644 --- a/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts +++ b/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts @@ -54,6 +54,10 @@ vi.mock("./client.js", () => ({ }), })); +vi.mock("./probe.js", () => ({ + probeIMessage: vi.fn(async () => ({ ok: true })), +})); + const flush = () => new Promise((resolve) => setTimeout(resolve, 0)); async function waitForSubscribe() { diff --git a/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts b/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts index 49e739f1f..37bb277dd 100644 --- a/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts +++ b/src/imessage/monitor.updates-last-route-chat-id-direct-messages.test.ts @@ -54,6 +54,10 @@ vi.mock("./client.js", () => ({ }), })); +vi.mock("./probe.js", () => ({ + probeIMessage: vi.fn(async () => ({ ok: true })), +})); + const flush = () => new Promise((resolve) => setTimeout(resolve, 0)); async function waitForSubscribe() { diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index 2918b5fc8..67e5d55e9 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -30,6 +30,7 @@ import { } from "../../config/group-policy.js"; import { resolveStorePath, updateLastRoute } from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; +import { waitForTransportReady } from "../../infra/transport-ready.js"; import { mediaKindFromMime } from "../../media/constants.js"; import { buildPairingReply } from "../../pairing/pairing-messages.js"; import { @@ -40,6 +41,7 @@ import { resolveAgentRoute } from "../../routing/resolve-route.js"; import { truncateUtf16Safe } from "../../utils.js"; import { resolveIMessageAccount } from "../accounts.js"; import { createIMessageRpcClient } from "../client.js"; +import { probeIMessage } from "../probe.js"; import { sendMessageIMessage } from "../send.js"; import { formatIMessageChatTarget, @@ -76,6 +78,8 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P const dmPolicy = imessageCfg.dmPolicy ?? "pairing"; const includeAttachments = opts.includeAttachments ?? imessageCfg.includeAttachments ?? false; const mediaMaxBytes = (opts.mediaMaxMb ?? imessageCfg.mediaMaxMb ?? 16) * 1024 * 1024; + const cliPath = opts.cliPath ?? imessageCfg.cliPath ?? "imsg"; + const dbPath = opts.dbPath ?? imessageCfg.dbPath; const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "imessage" }); const inboundDebouncer = createInboundDebouncer<{ message: IMessagePayload }>({ @@ -453,9 +457,26 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P await inboundDebouncer.enqueue({ message }); }; + await waitForTransportReady({ + label: "imsg rpc", + timeoutMs: 30_000, + logAfterMs: 10_000, + logIntervalMs: 10_000, + pollIntervalMs: 500, + abortSignal: opts.abortSignal, + runtime, + check: async () => { + const probe = await probeIMessage(2000, { cliPath, dbPath, runtime }); + if (probe.ok) return { ok: true }; + return { ok: false, error: probe.error ?? "unreachable" }; + }, + }); + + if (opts.abortSignal?.aborted) return; + const client = await createIMessageRpcClient({ - cliPath: opts.cliPath ?? imessageCfg.cliPath, - dbPath: opts.dbPath ?? imessageCfg.dbPath, + cliPath, + dbPath, runtime, onNotification: (msg) => { if (msg.method === "message") { diff --git a/src/imessage/probe.ts b/src/imessage/probe.ts index 13ac52f69..d59f20512 100644 --- a/src/imessage/probe.ts +++ b/src/imessage/probe.ts @@ -1,5 +1,6 @@ import { detectBinary } from "../commands/onboard-helpers.js"; import { loadConfig } from "../config/config.js"; +import type { RuntimeEnv } from "../runtime.js"; import { createIMessageRpcClient } from "./client.js"; export type IMessageProbe = { @@ -7,10 +8,19 @@ export type IMessageProbe = { error?: string | null; }; -export async function probeIMessage(timeoutMs = 2000): Promise { - const cfg = loadConfig(); - const cliPath = cfg.channels?.imessage?.cliPath?.trim() || "imsg"; - const dbPath = cfg.channels?.imessage?.dbPath?.trim(); +export type IMessageProbeOptions = { + cliPath?: string; + dbPath?: string; + runtime?: RuntimeEnv; +}; + +export async function probeIMessage( + timeoutMs = 2000, + opts: IMessageProbeOptions = {}, +): Promise { + const cfg = opts.cliPath || opts.dbPath ? undefined : loadConfig(); + const cliPath = opts.cliPath?.trim() || cfg?.channels?.imessage?.cliPath?.trim() || "imsg"; + const dbPath = opts.dbPath?.trim() || cfg?.channels?.imessage?.dbPath?.trim(); const detected = await detectBinary(cliPath); if (!detected) { return { ok: false, error: `imsg not found (${cliPath})` }; @@ -19,6 +29,7 @@ export async function probeIMessage(timeoutMs = 2000): Promise { const client = await createIMessageRpcClient({ cliPath, dbPath, + runtime: opts.runtime, }); try { await client.request("chats.list", { limit: 1 }, { timeoutMs }); diff --git a/src/infra/transport-ready.test.ts b/src/infra/transport-ready.test.ts new file mode 100644 index 000000000..7a072c53a --- /dev/null +++ b/src/infra/transport-ready.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, it, vi } from "vitest"; + +import { waitForTransportReady } from "./transport-ready.js"; + +describe("waitForTransportReady", () => { + it("returns when the check succeeds and logs after the delay", async () => { + const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() }; + let attempts = 0; + await waitForTransportReady({ + label: "test transport", + timeoutMs: 500, + logAfterMs: 100, + logIntervalMs: 100, + pollIntervalMs: 50, + runtime, + check: async () => { + attempts += 1; + if (attempts > 3) return { ok: true }; + return { ok: false, error: "not ready" }; + }, + }); + expect(runtime.error).toHaveBeenCalled(); + }); + + it("throws after the timeout", async () => { + const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() }; + await expect( + waitForTransportReady({ + label: "test transport", + timeoutMs: 200, + logAfterMs: 0, + logIntervalMs: 100, + pollIntervalMs: 50, + runtime, + check: async () => ({ ok: false, error: "still down" }), + }), + ).rejects.toThrow("test transport not ready"); + expect(runtime.error).toHaveBeenCalled(); + }); +}); diff --git a/src/infra/transport-ready.ts b/src/infra/transport-ready.ts new file mode 100644 index 000000000..ec514d2e5 --- /dev/null +++ b/src/infra/transport-ready.ts @@ -0,0 +1,59 @@ +import { danger } from "../globals.js"; +import type { RuntimeEnv } from "../runtime.js"; +import { sleepWithAbort } from "./backoff.js"; + +export type TransportReadyResult = { + ok: boolean; + error?: string | null; +}; + +export type WaitForTransportReadyParams = { + label: string; + timeoutMs: number; + logAfterMs?: number; + logIntervalMs?: number; + pollIntervalMs?: number; + abortSignal?: AbortSignal; + runtime: RuntimeEnv; + check: () => Promise; +}; + +export async function waitForTransportReady(params: WaitForTransportReadyParams): Promise { + const started = Date.now(); + const timeoutMs = Math.max(0, params.timeoutMs); + const deadline = started + timeoutMs; + const logAfterMs = Math.max(0, params.logAfterMs ?? timeoutMs); + const logIntervalMs = Math.max(1_000, params.logIntervalMs ?? 30_000); + const pollIntervalMs = Math.max(50, params.pollIntervalMs ?? 150); + let nextLogAt = started + logAfterMs; + let lastError: string | null = null; + + while (true) { + if (params.abortSignal?.aborted) return; + const res = await params.check(); + if (res.ok) return; + lastError = res.error ?? null; + + const now = Date.now(); + if (now >= deadline) break; + if (now >= nextLogAt) { + const elapsedMs = now - started; + params.runtime.error?.( + danger(`${params.label} not ready after ${elapsedMs}ms (${lastError ?? "unknown error"})`), + ); + nextLogAt = now + logIntervalMs; + } + + try { + await sleepWithAbort(pollIntervalMs, params.abortSignal); + } catch (err) { + if (params.abortSignal?.aborted) return; + throw err; + } + } + + params.runtime.error?.( + danger(`${params.label} not ready after ${timeoutMs}ms (${lastError ?? "unknown error"})`), + ); + throw new Error(`${params.label} not ready (${lastError ?? "unknown error"})`); +} diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index 10e748674..98b950e9b 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -4,10 +4,10 @@ import type { ReplyPayload } from "../auto-reply/types.js"; import type { ClawdbotConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js"; import type { SignalReactionNotificationMode } from "../config/types.js"; -import { danger } from "../globals.js"; import { saveMediaBuffer } from "../media/store.js"; import type { RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; +import { waitForTransportReady } from "../infra/transport-ready.js"; import { resolveSignalAccount } from "./accounts.js"; import { signalCheck, signalRpcRequest } from "./client.js"; import { spawnSignalDaemon } from "./daemon.js"; @@ -145,23 +145,27 @@ async function waitForSignalDaemonReady(params: { baseUrl: string; abortSignal?: AbortSignal; timeoutMs: number; + logAfterMs: number; + logIntervalMs?: number; runtime: RuntimeEnv; }): Promise { - const started = Date.now(); - let lastError: string | null = null; - - while (Date.now() - started < params.timeoutMs) { - if (params.abortSignal?.aborted) return; - const res = await signalCheck(params.baseUrl, 1000); - if (res.ok) return; - lastError = res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable"); - await new Promise((r) => setTimeout(r, 150)); - } - - params.runtime.error?.( - danger(`daemon not ready after ${params.timeoutMs}ms (${lastError ?? "unknown error"})`), - ); - throw new Error(`signal daemon not ready (${lastError ?? "unknown error"})`); + await waitForTransportReady({ + label: "signal daemon", + timeoutMs: params.timeoutMs, + logAfterMs: params.logAfterMs, + logIntervalMs: params.logIntervalMs, + pollIntervalMs: 150, + abortSignal: params.abortSignal, + runtime: params.runtime, + check: async () => { + const res = await signalCheck(params.baseUrl, 1000); + if (res.ok) return { ok: true }; + return { + ok: false, + error: res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable"), + }; + }, + }); } async function fetchAttachment(params: { @@ -305,7 +309,9 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi await waitForSignalDaemonReady({ baseUrl, abortSignal: opts.abortSignal, - timeoutMs: 10_000, + timeoutMs: 30_000, + logAfterMs: 10_000, + logIntervalMs: 10_000, runtime, }); }