fix: bound signal/imessage transport readiness waits

Co-authored-by: Szpadel <1857251+Szpadel@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-01-16 20:33:04 +00:00
parent 0cd24137e8
commit aaa310c047
9 changed files with 172 additions and 26 deletions

View File

@@ -54,6 +54,10 @@ vi.mock("./client.js", () => ({
}),
}));
vi.mock("./probe.js", () => ({
probeIMessage: vi.fn(async () => ({ ok: true })),
}));
const flush = () => new Promise((resolve) => setTimeout(resolve, 0));
async function waitForSubscribe() {

View File

@@ -54,6 +54,10 @@ vi.mock("./client.js", () => ({
}),
}));
vi.mock("./probe.js", () => ({
probeIMessage: vi.fn(async () => ({ ok: true })),
}));
const flush = () => new Promise((resolve) => setTimeout(resolve, 0));
async function waitForSubscribe() {

View File

@@ -30,6 +30,7 @@ import {
} from "../../config/group-policy.js";
import { resolveStorePath, updateLastRoute } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { waitForTransportReady } from "../../infra/transport-ready.js";
import { mediaKindFromMime } from "../../media/constants.js";
import { buildPairingReply } from "../../pairing/pairing-messages.js";
import {
@@ -40,6 +41,7 @@ import { resolveAgentRoute } from "../../routing/resolve-route.js";
import { truncateUtf16Safe } from "../../utils.js";
import { resolveIMessageAccount } from "../accounts.js";
import { createIMessageRpcClient } from "../client.js";
import { probeIMessage } from "../probe.js";
import { sendMessageIMessage } from "../send.js";
import {
formatIMessageChatTarget,
@@ -76,6 +78,8 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
const dmPolicy = imessageCfg.dmPolicy ?? "pairing";
const includeAttachments = opts.includeAttachments ?? imessageCfg.includeAttachments ?? false;
const mediaMaxBytes = (opts.mediaMaxMb ?? imessageCfg.mediaMaxMb ?? 16) * 1024 * 1024;
const cliPath = opts.cliPath ?? imessageCfg.cliPath ?? "imsg";
const dbPath = opts.dbPath ?? imessageCfg.dbPath;
const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "imessage" });
const inboundDebouncer = createInboundDebouncer<{ message: IMessagePayload }>({
@@ -453,9 +457,26 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
await inboundDebouncer.enqueue({ message });
};
await waitForTransportReady({
label: "imsg rpc",
timeoutMs: 30_000,
logAfterMs: 10_000,
logIntervalMs: 10_000,
pollIntervalMs: 500,
abortSignal: opts.abortSignal,
runtime,
check: async () => {
const probe = await probeIMessage(2000, { cliPath, dbPath, runtime });
if (probe.ok) return { ok: true };
return { ok: false, error: probe.error ?? "unreachable" };
},
});
if (opts.abortSignal?.aborted) return;
const client = await createIMessageRpcClient({
cliPath: opts.cliPath ?? imessageCfg.cliPath,
dbPath: opts.dbPath ?? imessageCfg.dbPath,
cliPath,
dbPath,
runtime,
onNotification: (msg) => {
if (msg.method === "message") {

View File

@@ -1,5 +1,6 @@
import { detectBinary } from "../commands/onboard-helpers.js";
import { loadConfig } from "../config/config.js";
import type { RuntimeEnv } from "../runtime.js";
import { createIMessageRpcClient } from "./client.js";
export type IMessageProbe = {
@@ -7,10 +8,19 @@ export type IMessageProbe = {
error?: string | null;
};
export async function probeIMessage(timeoutMs = 2000): Promise<IMessageProbe> {
const cfg = loadConfig();
const cliPath = cfg.channels?.imessage?.cliPath?.trim() || "imsg";
const dbPath = cfg.channels?.imessage?.dbPath?.trim();
export type IMessageProbeOptions = {
cliPath?: string;
dbPath?: string;
runtime?: RuntimeEnv;
};
export async function probeIMessage(
timeoutMs = 2000,
opts: IMessageProbeOptions = {},
): Promise<IMessageProbe> {
const cfg = opts.cliPath || opts.dbPath ? undefined : loadConfig();
const cliPath = opts.cliPath?.trim() || cfg?.channels?.imessage?.cliPath?.trim() || "imsg";
const dbPath = opts.dbPath?.trim() || cfg?.channels?.imessage?.dbPath?.trim();
const detected = await detectBinary(cliPath);
if (!detected) {
return { ok: false, error: `imsg not found (${cliPath})` };
@@ -19,6 +29,7 @@ export async function probeIMessage(timeoutMs = 2000): Promise<IMessageProbe> {
const client = await createIMessageRpcClient({
cliPath,
dbPath,
runtime: opts.runtime,
});
try {
await client.request("chats.list", { limit: 1 }, { timeoutMs });

View File

@@ -0,0 +1,40 @@
import { describe, expect, it, vi } from "vitest";
import { waitForTransportReady } from "./transport-ready.js";
describe("waitForTransportReady", () => {
it("returns when the check succeeds and logs after the delay", async () => {
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() };
let attempts = 0;
await waitForTransportReady({
label: "test transport",
timeoutMs: 500,
logAfterMs: 100,
logIntervalMs: 100,
pollIntervalMs: 50,
runtime,
check: async () => {
attempts += 1;
if (attempts > 3) return { ok: true };
return { ok: false, error: "not ready" };
},
});
expect(runtime.error).toHaveBeenCalled();
});
it("throws after the timeout", async () => {
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() };
await expect(
waitForTransportReady({
label: "test transport",
timeoutMs: 200,
logAfterMs: 0,
logIntervalMs: 100,
pollIntervalMs: 50,
runtime,
check: async () => ({ ok: false, error: "still down" }),
}),
).rejects.toThrow("test transport not ready");
expect(runtime.error).toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,59 @@
import { danger } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js";
import { sleepWithAbort } from "./backoff.js";
export type TransportReadyResult = {
ok: boolean;
error?: string | null;
};
export type WaitForTransportReadyParams = {
label: string;
timeoutMs: number;
logAfterMs?: number;
logIntervalMs?: number;
pollIntervalMs?: number;
abortSignal?: AbortSignal;
runtime: RuntimeEnv;
check: () => Promise<TransportReadyResult>;
};
export async function waitForTransportReady(params: WaitForTransportReadyParams): Promise<void> {
const started = Date.now();
const timeoutMs = Math.max(0, params.timeoutMs);
const deadline = started + timeoutMs;
const logAfterMs = Math.max(0, params.logAfterMs ?? timeoutMs);
const logIntervalMs = Math.max(1_000, params.logIntervalMs ?? 30_000);
const pollIntervalMs = Math.max(50, params.pollIntervalMs ?? 150);
let nextLogAt = started + logAfterMs;
let lastError: string | null = null;
while (true) {
if (params.abortSignal?.aborted) return;
const res = await params.check();
if (res.ok) return;
lastError = res.error ?? null;
const now = Date.now();
if (now >= deadline) break;
if (now >= nextLogAt) {
const elapsedMs = now - started;
params.runtime.error?.(
danger(`${params.label} not ready after ${elapsedMs}ms (${lastError ?? "unknown error"})`),
);
nextLogAt = now + logIntervalMs;
}
try {
await sleepWithAbort(pollIntervalMs, params.abortSignal);
} catch (err) {
if (params.abortSignal?.aborted) return;
throw err;
}
}
params.runtime.error?.(
danger(`${params.label} not ready after ${timeoutMs}ms (${lastError ?? "unknown error"})`),
);
throw new Error(`${params.label} not ready (${lastError ?? "unknown error"})`);
}

View File

@@ -4,10 +4,10 @@ import type { ReplyPayload } from "../auto-reply/types.js";
import type { ClawdbotConfig } from "../config/config.js";
import { loadConfig } from "../config/config.js";
import type { SignalReactionNotificationMode } from "../config/types.js";
import { danger } from "../globals.js";
import { saveMediaBuffer } from "../media/store.js";
import type { RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { waitForTransportReady } from "../infra/transport-ready.js";
import { resolveSignalAccount } from "./accounts.js";
import { signalCheck, signalRpcRequest } from "./client.js";
import { spawnSignalDaemon } from "./daemon.js";
@@ -145,23 +145,27 @@ async function waitForSignalDaemonReady(params: {
baseUrl: string;
abortSignal?: AbortSignal;
timeoutMs: number;
logAfterMs: number;
logIntervalMs?: number;
runtime: RuntimeEnv;
}): Promise<void> {
const started = Date.now();
let lastError: string | null = null;
while (Date.now() - started < params.timeoutMs) {
if (params.abortSignal?.aborted) return;
const res = await signalCheck(params.baseUrl, 1000);
if (res.ok) return;
lastError = res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable");
await new Promise((r) => setTimeout(r, 150));
}
params.runtime.error?.(
danger(`daemon not ready after ${params.timeoutMs}ms (${lastError ?? "unknown error"})`),
);
throw new Error(`signal daemon not ready (${lastError ?? "unknown error"})`);
await waitForTransportReady({
label: "signal daemon",
timeoutMs: params.timeoutMs,
logAfterMs: params.logAfterMs,
logIntervalMs: params.logIntervalMs,
pollIntervalMs: 150,
abortSignal: params.abortSignal,
runtime: params.runtime,
check: async () => {
const res = await signalCheck(params.baseUrl, 1000);
if (res.ok) return { ok: true };
return {
ok: false,
error: res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable"),
};
},
});
}
async function fetchAttachment(params: {
@@ -305,7 +309,9 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
await waitForSignalDaemonReady({
baseUrl,
abortSignal: opts.abortSignal,
timeoutMs: 10_000,
timeoutMs: 30_000,
logAfterMs: 10_000,
logIntervalMs: 10_000,
runtime,
});
}