diff --git a/CHANGELOG.md b/CHANGELOG.md index c7aaac5a7..5794c2deb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Fixes - Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj. +- Cron: `wakeMode: "now"` waits for heartbeat completion (and retries when the main lane is busy). (#666) — thanks @roshanasingh4. - Agents/OpenAI: fix Responses tool-only → follow-up turn handling (avoid standalone `reasoning` items that trigger 400 “required following item”). - Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage. - Auth: throttle external CLI credential syncs (Claude/Codex), reduce Keychain reads, and skip sync when cached credentials are still fresh. @@ -32,6 +33,7 @@ - Tests/Agents: add regression coverage for workspace tool path resolution and bash cwd defaults. - iOS/Android: enable stricter concurrency/lint checks; fix Swift 6 strict concurrency issues + Android lint errors (ExifInterface, obsolete SDK check). (#662) — thanks @KristijanJovanovski. - iOS/macOS: share `AsyncTimeout`, require explicit `bridgeStableID` on connect, and harden tool display defaults (avoids missing-resource label fallbacks). +- Telegram: serialize media-group processing to avoid missed albums under load. - Docs: showcase entries for ParentPay, R2 Upload, iOS TestFlight, and Oura Health. (#650) — thanks @henrino3. ## 2026.1.9 diff --git a/apps/macos/Sources/Clawdbot/GatewayEnvironment.swift b/apps/macos/Sources/Clawdbot/GatewayEnvironment.swift index 5edcfc564..b9e6e9e97 100644 --- a/apps/macos/Sources/Clawdbot/GatewayEnvironment.swift +++ b/apps/macos/Sources/Clawdbot/GatewayEnvironment.swift @@ -387,15 +387,15 @@ enum GatewayEnvironment { private static func bundledGatewayStatusMessage( gatewayVersion: String, - nodeVersion: String? - ) -> String { + nodeVersion: String?) -> String + { "\(self.bundledGatewayLabel) \(gatewayVersion) (node \(nodeVersion ?? "unknown"))" } private static func bundledGatewayIncompatibleMessage( installed: Semver, - expected: Semver - ) -> String { + expected: Semver) -> String + { "\(self.bundledGatewayLabel) \(installed.description) is incompatible with app " + "\(expected.description); rebuild the app bundle." } diff --git a/src/cron/service.test.ts b/src/cron/service.test.ts index 34762abd8..03364e9cd 100644 --- a/src/cron/service.test.ts +++ b/src/cron/service.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; import { CronService } from "./service.js"; const noopLogger = { @@ -78,6 +79,68 @@ describe("CronService", () => { await store.cleanup(); }); + it("wakeMode now waits for heartbeat completion when available", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + let now = 0; + const nowMs = () => { + now += 10; + return now; + }; + + let resolveHeartbeat: ((res: HeartbeatRunResult) => void) | null = null; + const runHeartbeatOnce = vi.fn( + async () => + await new Promise((resolve) => { + resolveHeartbeat = resolve; + }), + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + nowMs, + enqueueSystemEvent, + requestHeartbeatNow, + runHeartbeatOnce, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + const job = await cron.add({ + name: "wakeMode now waits", + enabled: true, + schedule: { kind: "at", atMs: 1 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "hello" }, + }); + + const runPromise = cron.run(job.id, "force"); + for (let i = 0; i < 10; i++) { + if (runHeartbeatOnce.mock.calls.length > 0) break; + // Let the locked() chain progress. + await Promise.resolve(); + } + + expect(runHeartbeatOnce).toHaveBeenCalledTimes(1); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); + expect(enqueueSystemEvent).toHaveBeenCalledWith("hello"); + expect(job.state.runningAtMs).toBeTypeOf("number"); + + resolveHeartbeat?.({ status: "ran", durationMs: 123 }); + await runPromise; + + expect(job.state.lastStatus).toBe("ok"); + expect(job.state.lastDurationMs).toBeGreaterThan(0); + + cron.stop(); + await store.cleanup(); + }); + it("runs an isolated job and posts summary to main", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); diff --git a/src/cron/service.ts b/src/cron/service.ts index 87046c62e..870e56675 100644 --- a/src/cron/service.ts +++ b/src/cron/service.ts @@ -1,5 +1,6 @@ import crypto from "node:crypto"; +import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; import { truncateUtf16Safe } from "../utils.js"; import { migrateLegacyCronPayload } from "./payload-migration.js"; import { computeNextRunAtMs } from "./schedule.js"; @@ -37,6 +38,9 @@ export type CronServiceDeps = { cronEnabled: boolean; enqueueSystemEvent: (text: string) => void; requestHeartbeatNow: (opts?: { reason?: string }) => void; + runHeartbeatOnce?: (opts?: { + reason?: string; + }) => Promise; runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{ status: "ok" | "error" | "skipped"; summary?: string; @@ -45,6 +49,10 @@ export type CronServiceDeps = { onEvent?: (evt: CronEvent) => void; }; +type CronServiceDepsInternal = Omit & { + nowMs: () => number; +}; + const STUCK_RUN_MS = 2 * 60 * 60 * 1000; const MAX_TIMEOUT_MS = 2 ** 31 - 1; @@ -99,8 +107,7 @@ function normalizePayloadToSystemText(payload: CronPayload) { } export class CronService { - private readonly deps: Required> & - Pick; + private readonly deps: CronServiceDepsInternal; private store: CronStoreFile | null = null; private timer: NodeJS.Timeout | null = null; private running = false; @@ -111,7 +118,6 @@ export class CronService { this.deps = { ...deps, nowMs: deps.nowMs ?? (() => Date.now()), - onEvent: deps.onEvent, }; } @@ -514,10 +520,42 @@ export class CronService { return; } this.deps.enqueueSystemEvent(text); - if (job.wakeMode === "now") { + if (job.wakeMode === "now" && this.deps.runHeartbeatOnce) { + const reason = `cron:${job.id}`; + const delay = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + const maxWaitMs = 2 * 60_000; + const waitStartedAt = this.deps.nowMs(); + + let heartbeatResult: HeartbeatRunResult; + for (;;) { + heartbeatResult = await this.deps.runHeartbeatOnce({ reason }); + if ( + heartbeatResult.status !== "skipped" || + heartbeatResult.reason !== "requests-in-flight" + ) { + break; + } + if (this.deps.nowMs() - waitStartedAt > maxWaitMs) { + heartbeatResult = { + status: "skipped", + reason: "timeout waiting for main lane to become idle", + }; + break; + } + await delay(250); + } + + if (heartbeatResult.status === "ran") { + await finish("ok", undefined, text); + } else if (heartbeatResult.status === "skipped") + await finish("skipped", heartbeatResult.reason, text); + else await finish("error", heartbeatResult.reason, text); + } else { + // wakeMode is "next-heartbeat" or runHeartbeatOnce not available this.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); + await finish("ok", undefined, text); } - await finish("ok", undefined, text); return; } diff --git a/src/gateway/server.models-voicewake.test.ts b/src/gateway/server.models-voicewake.test.ts index 048f90c18..2c034cce2 100644 --- a/src/gateway/server.models-voicewake.test.ts +++ b/src/gateway/server.models-voicewake.test.ts @@ -19,10 +19,12 @@ installGatewayTestHooks(); describe("gateway server models + voicewake", () => { const setTempHome = (homeDir: string) => { const prevHome = process.env.HOME; + const prevStateDir = process.env.CLAWDBOT_STATE_DIR; const prevUserProfile = process.env.USERPROFILE; const prevHomeDrive = process.env.HOMEDRIVE; const prevHomePath = process.env.HOMEPATH; process.env.HOME = homeDir; + process.env.CLAWDBOT_STATE_DIR = path.join(homeDir, ".clawdbot"); process.env.USERPROFILE = homeDir; if (process.platform === "win32") { const parsed = path.parse(homeDir); @@ -35,6 +37,11 @@ describe("gateway server models + voicewake", () => { } else { process.env.HOME = prevHome; } + if (prevStateDir === undefined) { + delete process.env.CLAWDBOT_STATE_DIR; + } else { + process.env.CLAWDBOT_STATE_DIR = prevStateDir; + } if (prevUserProfile === undefined) { delete process.env.USERPROFILE; } else { diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 805ae7ffe..a56f07605 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -61,7 +61,10 @@ import { startNodeBridgeServer } from "../infra/bridge/server.js"; import { resolveCanvasHostUrl } from "../infra/canvas-host-url.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; import { onHeartbeatEvent } from "../infra/heartbeat-events.js"; -import { startHeartbeatRunner } from "../infra/heartbeat-runner.js"; +import { + runHeartbeatOnce, + startHeartbeatRunner, +} from "../infra/heartbeat-runner.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { getMachineDisplayName } from "../infra/machine-name.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js"; @@ -715,6 +718,14 @@ export async function startGatewayServer( enqueueSystemEvent(text, { sessionKey: resolveMainSessionKey(cfg) }); }, requestHeartbeatNow, + runHeartbeatOnce: async (opts) => { + const runtimeConfig = loadConfig(); + return await runHeartbeatOnce({ + cfg: runtimeConfig, + reason: opts?.reason, + deps: { ...deps, runtime: defaultRuntime }, + }); + }, runIsolatedAgentJob: async ({ job, message }) => { const runtimeConfig = loadConfig(); return await runCronIsolatedAgentTurn({ diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 306a3e87d..f10a42a27 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -238,6 +238,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { }; const mediaGroupBuffer = new Map(); + let mediaGroupProcessing: Promise = Promise.resolve(); const cfg = opts.config ?? loadConfig(); const account = resolveTelegramAccount({ @@ -1228,14 +1229,24 @@ export function createTelegramBot(opts: TelegramBotOptions) { existing.messages.push({ msg, ctx }); existing.timer = setTimeout(async () => { mediaGroupBuffer.delete(mediaGroupId); - await processMediaGroup(existing); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(existing); + }) + .catch(() => undefined); + await mediaGroupProcessing; }, MEDIA_GROUP_TIMEOUT_MS); } else { const entry: MediaGroupEntry = { messages: [{ msg, ctx }], timer: setTimeout(async () => { mediaGroupBuffer.delete(mediaGroupId); - await processMediaGroup(entry); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(entry); + }) + .catch(() => undefined); + await mediaGroupProcessing; }, MEDIA_GROUP_TIMEOUT_MS), }; mediaGroupBuffer.set(mediaGroupId, entry);