From c0976ec09996a7eb21a49edc85c2ef33428fcbb4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 2 Jan 2026 01:04:59 +0100 Subject: [PATCH] fix(gateway): stream chat events for agent runs --- CHANGELOG.md | 1 + src/auto-reply/reply.ts | 4 + src/commands/agent.ts | 61 ++++- src/cron/isolated-agent.ts | 53 ++++- src/gateway/server.test.ts | 74 +++++- src/gateway/server.ts | 422 +++++++++++++++++++++++++++------ src/infra/agent-events.test.ts | 17 +- src/infra/agent-events.ts | 32 +++ 8 files changed, 578 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b0897c57b..cfc2de014 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ ### Fixes - Chat UI: keep the chat scrolled to the latest message after switching sessions. +- WebChat: stream live updates for sessions even when runs start outside the chat UI. - Gateway CLI: read `CLAWDIS_GATEWAY_PASSWORD` from environment in `callGateway()` — allows `doctor`/`health` commands to auth without explicit `--password` flag. - Auto-reply: suppress stray `HEARTBEAT_OK` acks so they never get delivered as messages. - Discord: include recent guild context when replying to mentions and add `discord.historyLimit` to tune how many messages are captured. diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 112c2605c..737ff6e61 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -37,6 +37,7 @@ import { saveSessionStore, } from "../config/sessions.js"; import { logVerbose } from "../globals.js"; +import { registerAgentRunContext } from "../infra/agent-events.js"; import { buildProviderSummary } from "../infra/provider-summary.js"; import { triggerClawdisRestart } from "../infra/restart.js"; import { @@ -1196,6 +1197,9 @@ export async function getReplyFromConfig( await startTypingLoop(); } const runId = crypto.randomUUID(); + if (sessionKey) { + registerAgentRunContext(runId, { sessionKey }); + } let runResult: Awaited>; try { runResult = await runEmbeddedPiAgent({ diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 018d3ec42..335cc4a7d 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -36,7 +36,10 @@ import { type SessionEntry, saveSessionStore, } from "../config/sessions.js"; -import { emitAgentEvent } from "../infra/agent-events.js"; +import { + emitAgentEvent, + registerAgentRunContext, +} from "../infra/agent-events.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { resolveTelegramToken } from "../telegram/token.js"; import { normalizeE164 } from "../utils.js"; @@ -204,6 +207,10 @@ export async function agentCommand( } = sessionResolution; let sessionEntry = resolvedSessionEntry; + if (sessionKey) { + registerAgentRunContext(sessionId, { sessionKey }); + } + const resolvedThinkLevel = thinkOnce ?? thinkOverride ?? @@ -413,12 +420,15 @@ export async function agentCommand( const payloads = result.payloads ?? []; const deliver = opts.deliver === true; const bestEffortDeliver = opts.bestEffortDeliver === true; - const deliveryProvider = (opts.provider ?? "whatsapp").toLowerCase(); + const deliveryProviderRaw = (opts.provider ?? "whatsapp").toLowerCase(); + const deliveryProvider = + deliveryProviderRaw === "imsg" ? "imessage" : deliveryProviderRaw; const whatsappTarget = opts.to ? normalizeE164(opts.to) : allowFrom[0]; const telegramTarget = opts.to?.trim() || undefined; const discordTarget = opts.to?.trim() || undefined; const signalTarget = opts.to?.trim() || undefined; + const imessageTarget = opts.to?.trim() || undefined; const logDeliveryError = (err: unknown) => { const deliveryTarget = @@ -428,8 +438,10 @@ export async function agentCommand( ? whatsappTarget : deliveryProvider === "discord" ? discordTarget - : deliveryProvider === "signal" - ? signalTarget + : deliveryProvider === "signal" + ? signalTarget + : deliveryProvider === "imessage" + ? imessageTarget : undefined; const message = `Delivery failed (${deliveryProvider}${deliveryTarget ? ` to ${deliveryTarget}` : ""}): ${String(err)}`; runtime.error?.(message); @@ -463,6 +475,13 @@ export async function agentCommand( if (!bestEffortDeliver) throw err; logDeliveryError(err); } + if (deliveryProvider === "imessage" && !imessageTarget) { + const err = new Error( + "Delivering to iMessage requires --to ", + ); + if (!bestEffortDeliver) throw err; + logDeliveryError(err); + } if (deliveryProvider === "webchat") { const err = new Error( "Delivering to WebChat is not supported via `clawdis agent`; use WhatsApp/Telegram or run with --deliver=false.", @@ -475,6 +494,7 @@ export async function agentCommand( deliveryProvider !== "telegram" && deliveryProvider !== "discord" && deliveryProvider !== "signal" && + deliveryProvider !== "imessage" && deliveryProvider !== "webchat" ) { const err = new Error(`Unknown provider: ${deliveryProvider}`); @@ -621,5 +641,38 @@ export async function agentCommand( logDeliveryError(err); } } + + if (deliveryProvider === "imessage" && imessageTarget) { + try { + if (media.length === 0) { + for (const chunk of chunkText(text, 4000)) { + await deps.sendMessageIMessage(imessageTarget, chunk, { + maxBytes: cfg.imessage?.mediaMaxMb + ? cfg.imessage.mediaMaxMb * 1024 * 1024 + : cfg.agent?.mediaMaxMb + ? cfg.agent.mediaMaxMb * 1024 * 1024 + : undefined, + }); + } + } else { + let first = true; + for (const url of media) { + const caption = first ? text : ""; + first = false; + await deps.sendMessageIMessage(imessageTarget, caption, { + mediaUrl: url, + maxBytes: cfg.imessage?.mediaMaxMb + ? cfg.imessage.mediaMaxMb * 1024 * 1024 + : cfg.agent?.mediaMaxMb + ? cfg.agent.mediaMaxMb * 1024 * 1024 + : undefined, + }); + } + } + } catch (err) { + if (!bestEffortDeliver) throw err; + logDeliveryError(err); + } + } } } diff --git a/src/cron/isolated-agent.ts b/src/cron/isolated-agent.ts index 8e21f9c7d..bd74efc9a 100644 --- a/src/cron/isolated-agent.ts +++ b/src/cron/isolated-agent.ts @@ -24,6 +24,7 @@ import { type SessionEntry, saveSessionStore, } from "../config/sessions.js"; +import { registerAgentRunContext } from "../infra/agent-events.js"; import { resolveTelegramToken } from "../telegram/token.js"; import { normalizeE164 } from "../utils.js"; import type { CronJob } from "./types.js"; @@ -54,7 +55,13 @@ function pickSummaryFromPayloads( function resolveDeliveryTarget( cfg: ClawdisConfig, jobPayload: { - channel?: "last" | "whatsapp" | "telegram" | "discord" | "signal"; + channel?: + | "last" + | "whatsapp" + | "telegram" + | "discord" + | "signal" + | "imessage"; to?: string; }, ) { @@ -81,7 +88,8 @@ function resolveDeliveryTarget( requestedChannel === "whatsapp" || requestedChannel === "telegram" || requestedChannel === "discord" || - requestedChannel === "signal" + requestedChannel === "signal" || + requestedChannel === "imessage" ) { return requestedChannel; } @@ -244,6 +252,9 @@ export async function runCronIsolatedAgentTurn(params: { const sessionFile = resolveSessionTranscriptPath( cronSession.sessionEntry.sessionId, ); + registerAgentRunContext(cronSession.sessionEntry.sessionId, { + sessionKey: params.sessionKey, + }); runResult = await runEmbeddedPiAgent({ sessionId: cronSession.sessionEntry.sessionId, sessionKey: params.sessionKey, @@ -457,6 +468,44 @@ export async function runCronIsolatedAgentTurn(params: { return { status: "error", summary, error: String(err) }; return { status: "ok", summary }; } + } else if (resolvedDelivery.channel === "imessage") { + if (!resolvedDelivery.to) { + if (!bestEffortDeliver) + return { + status: "error", + summary, + error: "Cron delivery to iMessage requires a recipient.", + }; + return { + status: "skipped", + summary: "Delivery skipped (no iMessage recipient).", + }; + } + const to = resolvedDelivery.to; + try { + for (const payload of payloads) { + const mediaList = + payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); + if (mediaList.length === 0) { + for (const chunk of chunkText(payload.text ?? "", 4000)) { + await params.deps.sendMessageIMessage(to, chunk); + } + } else { + let first = true; + for (const url of mediaList) { + const caption = first ? (payload.text ?? "") : ""; + first = false; + await params.deps.sendMessageIMessage(to, caption, { + mediaUrl: url, + }); + } + } + } + } catch (err) { + if (!bestEffortDeliver) + return { status: "error", summary, error: String(err) }; + return { status: "ok", summary }; + } } } diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index f6ec100bc..0f9abb656 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -12,7 +12,11 @@ import { STATE_DIR_CLAWDIS, writeConfigFile, } from "../config/config.js"; -import { emitAgentEvent } from "../infra/agent-events.js"; +import { + emitAgentEvent, + registerAgentRunContext, + resetAgentRunContextForTest, +} from "../infra/agent-events.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; import { emitHeartbeatEvent } from "../infra/heartbeat-events.js"; import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js"; @@ -277,6 +281,7 @@ beforeEach(async () => { testCanvasHostPort = undefined; cronIsolatedRun.mockClear(); drainSystemEvents(); + resetAgentRunContextForTest(); __resetModelCatalogCacheForTest(); piSdkMock.enabled = false; piSdkMock.discoverCalls = 0; @@ -3621,6 +3626,73 @@ describe("gateway server", () => { await server.close(); }); + test("agent events stream to webchat clients when run context is registered", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); + testSessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testSessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + "utf-8", + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws, { + client: { + name: "webchat", + version: "1.0.0", + platform: "test", + mode: "webchat", + }, + }); + + registerAgentRunContext("run-auto-1", { sessionKey: "main" }); + + const finalChatP = onceMessage<{ + type: "event"; + event: string; + payload?: unknown; + }>( + ws, + (o) => { + if (o.type !== "event" || o.event !== "chat") return false; + const payload = o.payload as { state?: unknown; runId?: unknown } | undefined; + return payload?.state === "final" && payload.runId === "run-auto-1"; + }, + 8000, + ); + + emitAgentEvent({ + runId: "run-auto-1", + stream: "assistant", + data: { text: "hi from agent" }, + }); + emitAgentEvent({ + runId: "run-auto-1", + stream: "job", + data: { state: "done" }, + }); + + const evt = await finalChatP; + const payload = + evt.payload && typeof evt.payload === "object" + ? (evt.payload as Record) + : {}; + expect(payload.sessionKey).toBe("main"); + expect(payload.runId).toBe("run-auto-1"); + + ws.close(); + await server.close(); + }); + test("bridge chat.abort cancels while saving the session store", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 9fd781f16..42481e960 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -74,8 +74,18 @@ import { sendMessageDiscord, } from "../discord/index.js"; import { type DiscordProbe, probeDiscord } from "../discord/probe.js"; +import { + monitorIMessageProvider, + sendMessageIMessage, +} from "../imessage/index.js"; +import { probeIMessage, type IMessageProbe } from "../imessage/probe.js"; import { isVerbose } from "../globals.js"; -import { onAgentEvent } from "../infra/agent-events.js"; +import { + clearAgentRunContext, + getAgentRunContext, + onAgentEvent, + registerAgentRunContext, +} from "../infra/agent-events.js"; import { startGatewayBonjourAdvertiser } from "../infra/bonjour.js"; import { startNodeBridgeServer } from "../infra/bridge/server.js"; import { resolveCanvasHostUrl } from "../infra/canvas-host-url.js"; @@ -287,11 +297,13 @@ const logWhatsApp = logProviders.child("whatsapp"); const logTelegram = logProviders.child("telegram"); const logDiscord = logProviders.child("discord"); const logSignal = logProviders.child("signal"); +const logIMessage = logProviders.child("imessage"); const canvasRuntime = runtimeForLogger(logCanvas); const whatsappRuntimeEnv = runtimeForLogger(logWhatsApp); const telegramRuntimeEnv = runtimeForLogger(logTelegram); const discordRuntimeEnv = runtimeForLogger(logDiscord); const signalRuntimeEnv = runtimeForLogger(logSignal); +const imessageRuntimeEnv = runtimeForLogger(logIMessage); function resolveBonjourCliPath(): string | undefined { const envPath = process.env.CLAWDIS_CLI_PATH?.trim(); @@ -1345,7 +1357,13 @@ export async function startGatewayServer( wakeMode: "now" | "next-heartbeat"; sessionKey: string; deliver: boolean; - channel: "last" | "whatsapp" | "telegram" | "discord" | "signal"; + channel: + | "last" + | "whatsapp" + | "telegram" + | "discord" + | "signal" + | "imessage"; to?: string; thinking?: string; timeoutSeconds?: number; @@ -1371,15 +1389,19 @@ export async function startGatewayServer( channelRaw === "telegram" || channelRaw === "discord" || channelRaw === "signal" || + channelRaw === "imessage" || channelRaw === "last" ? channelRaw + : channelRaw === "imsg" + ? "imessage" : channelRaw === undefined ? "last" : null; if (channel === null) { return { ok: false, - error: "channel must be last|whatsapp|telegram|discord|signal", + error: + "channel must be last|whatsapp|telegram|discord|signal|imessage", }; } const toRaw = payload.to; @@ -1430,7 +1452,13 @@ export async function startGatewayServer( wakeMode: "now" | "next-heartbeat"; sessionKey: string; deliver: boolean; - channel: "last" | "whatsapp" | "telegram" | "discord" | "signal"; + channel: + | "last" + | "whatsapp" + | "telegram" + | "discord" + | "signal" + | "imessage"; to?: string; thinking?: string; timeoutSeconds?: number; @@ -1714,10 +1742,12 @@ export async function startGatewayServer( let telegramAbort: AbortController | null = null; let discordAbort: AbortController | null = null; let signalAbort: AbortController | null = null; + let imessageAbort: AbortController | null = null; let whatsappTask: Promise | null = null; let telegramTask: Promise | null = null; let discordTask: Promise | null = null; let signalTask: Promise | null = null; + let imessageTask: Promise | null = null; let whatsappRuntime: WebProviderStatus = { running: false, connected: false, @@ -1765,12 +1795,27 @@ export async function startGatewayServer( lastError: null, baseUrl: null, }; + let imessageRuntime: { + running: boolean; + lastStartAt?: number | null; + lastStopAt?: number | null; + lastError?: string | null; + cliPath?: string | null; + dbPath?: string | null; + } = { + running: false, + lastStartAt: null, + lastStopAt: null, + lastError: null, + cliPath: null, + dbPath: null, + }; const clients = new Set(); let seq = 0; // Track per-run sequence to detect out-of-order/lost agent events. const agentRunSeq = new Map(); const dedupe = new Map(); - // Map agent sessionId -> pending chat runs for WebChat clients. + // Map agent runId -> pending chat runs for WebChat clients. const chatRunSessions = new Map< string, Array<{ sessionKey: string; clientRunId: string }> @@ -1812,6 +1857,21 @@ export async function startGatewayServer( if (!queue.length) chatRunSessions.delete(sessionId); return entry; }; + const resolveSessionKeyForRun = (runId: string) => { + const cached = getAgentRunContext(runId)?.sessionKey; + if (cached) return cached; + const cfg = loadConfig(); + const storePath = resolveStorePath(cfg.session?.store); + const store = loadSessionStore(storePath); + const found = Object.entries(store).find( + ([, entry]) => entry?.sessionId === runId, + ); + const sessionKey = found?.[0]; + if (sessionKey) { + registerAgentRunContext(runId, { sessionKey }); + } + return sessionKey; + }; const chatRunBuffers = new Map(); const chatDeltaSentAt = new Map(); const chatAbortControllers = new Map< @@ -2221,11 +2281,92 @@ export async function startGatewayServer( }; }; + const startIMessageProvider = async () => { + if (imessageTask) return; + const cfg = loadConfig(); + if (!cfg.imessage) { + imessageRuntime = { + ...imessageRuntime, + running: false, + lastError: "not configured", + }; + logIMessage.info("skipping provider start (imessage not configured)"); + return; + } + if (cfg.imessage?.enabled === false) { + imessageRuntime = { + ...imessageRuntime, + running: false, + lastError: "disabled", + }; + logIMessage.info("skipping provider start (imessage.enabled=false)"); + return; + } + const cliPath = cfg.imessage?.cliPath?.trim() || "imsg"; + const dbPath = cfg.imessage?.dbPath?.trim(); + logIMessage.info( + `starting provider (${cliPath}${dbPath ? ` db=${dbPath}` : ""})`, + ); + imessageAbort = new AbortController(); + imessageRuntime = { + ...imessageRuntime, + running: true, + lastStartAt: Date.now(), + lastError: null, + cliPath, + dbPath: dbPath ?? null, + }; + const task = monitorIMessageProvider({ + cliPath, + dbPath, + allowFrom: cfg.imessage?.allowFrom, + includeAttachments: cfg.imessage?.includeAttachments, + mediaMaxMb: cfg.imessage?.mediaMaxMb, + runtime: imessageRuntimeEnv, + abortSignal: imessageAbort.signal, + }) + .catch((err) => { + imessageRuntime = { + ...imessageRuntime, + lastError: formatError(err), + }; + logIMessage.error(`provider exited: ${formatError(err)}`); + }) + .finally(() => { + imessageAbort = null; + imessageTask = null; + imessageRuntime = { + ...imessageRuntime, + running: false, + lastStopAt: Date.now(), + }; + }); + imessageTask = task; + }; + + const stopIMessageProvider = async () => { + if (!imessageAbort && !imessageTask) return; + imessageAbort?.abort(); + try { + await imessageTask; + } catch { + // ignore + } + imessageAbort = null; + imessageTask = null; + imessageRuntime = { + ...imessageRuntime, + running: false, + lastStopAt: Date.now(), + }; + }; + const startProviders = async () => { await startWhatsAppProvider(); await startDiscordProvider(); await startTelegramProvider(); await startSignalProvider(); + await startIMessageProvider(); }; const broadcast = ( @@ -3269,7 +3410,8 @@ export async function startGatewayServer( const provider = channel === "whatsapp" || channel === "telegram" || - channel === "signal" + channel === "signal" || + channel === "imessage" ? channel : undefined; const to = @@ -3586,81 +3728,152 @@ export async function startGatewayServer( broadcast("agent", evt); const chatLink = peekChatRun(evt.runId); - if (chatLink) { - // Map agent bus events to chat events for WS WebChat clients. - // Use clientRunId so the webchat can correlate with its pending promise. - const { sessionKey, clientRunId } = chatLink; - bridgeSendToSession(sessionKey, "agent", evt); - if (evt.stream === "assistant" && typeof evt.data?.text === "string") { - const base = { - runId: clientRunId, - sessionKey, - seq: evt.seq, - }; - chatRunBuffers.set(clientRunId, evt.data.text); - const now = Date.now(); - const last = chatDeltaSentAt.get(clientRunId) ?? 0; - // Throttle UI delta events so slow clients don't accumulate unbounded buffers. - if (now - last >= 150) { - chatDeltaSentAt.set(clientRunId, now); - const payload = { - ...base, - state: "delta" as const, - message: { - role: "assistant", - content: [{ type: "text", text: evt.data.text }], - timestamp: now, - }, + const sessionKey = + chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId); + const jobState = + evt.stream === "job" && typeof evt.data?.state === "string" + ? evt.data.state + : null; + + if (sessionKey) { + if (chatLink) { + // Map agent bus events to chat events for WS WebChat clients. + // Use clientRunId so the webchat can correlate with its pending promise. + const { clientRunId } = chatLink; + bridgeSendToSession(sessionKey, "agent", evt); + if (evt.stream === "assistant" && typeof evt.data?.text === "string") { + const base = { + runId: clientRunId, + sessionKey, + seq: evt.seq, }; - broadcast("chat", payload, { dropIfSlow: true }); - bridgeSendToSession(sessionKey, "chat", payload); + chatRunBuffers.set(clientRunId, evt.data.text); + const now = Date.now(); + const last = chatDeltaSentAt.get(clientRunId) ?? 0; + // Throttle UI delta events so slow clients don't accumulate unbounded buffers. + if (now - last >= 150) { + chatDeltaSentAt.set(clientRunId, now); + const payload = { + ...base, + state: "delta" as const, + message: { + role: "assistant", + content: [{ type: "text", text: evt.data.text }], + timestamp: now, + }, + }; + broadcast("chat", payload, { dropIfSlow: true }); + bridgeSendToSession(sessionKey, "chat", payload); + } + } else if (jobState === "done" || jobState === "error") { + const finished = shiftChatRun(evt.runId); + if (!finished) { + if (jobState) clearAgentRunContext(evt.runId); + return; + } + const { sessionKey: finishedSessionKey, clientRunId: finishedRunId } = + finished; + const base = { + runId: finishedRunId, + sessionKey: finishedSessionKey, + seq: evt.seq, + }; + const text = chatRunBuffers.get(finishedRunId)?.trim() ?? ""; + chatRunBuffers.delete(finishedRunId); + chatDeltaSentAt.delete(finishedRunId); + if (jobState === "done") { + const payload = { + ...base, + state: "final", + message: text + ? { + role: "assistant", + content: [{ type: "text", text }], + timestamp: Date.now(), + } + : undefined, + }; + broadcast("chat", payload); + bridgeSendToSession(finishedSessionKey, "chat", payload); + } else { + const payload = { + ...base, + state: "error", + errorMessage: evt.data.error + ? formatForLog(evt.data.error) + : undefined, + }; + broadcast("chat", payload); + bridgeSendToSession(finishedSessionKey, "chat", payload); + } } - } else if ( - evt.stream === "job" && - typeof evt.data?.state === "string" && - (evt.data.state === "done" || evt.data.state === "error") - ) { - const finished = shiftChatRun(evt.runId); - if (!finished) { - return; - } - const { sessionKey: finishedSessionKey, clientRunId: finishedRunId } = - finished; - const base = { - runId: finishedRunId, - sessionKey: finishedSessionKey, - seq: evt.seq, - }; - const text = chatRunBuffers.get(finishedRunId)?.trim() ?? ""; - chatRunBuffers.delete(finishedRunId); - chatDeltaSentAt.delete(finishedRunId); - if (evt.data.state === "done") { - const payload = { - ...base, - state: "final", - message: text - ? { - role: "assistant", - content: [{ type: "text", text }], - timestamp: Date.now(), - } - : undefined, + } else { + const clientRunId = evt.runId; + bridgeSendToSession(sessionKey, "agent", evt); + if (evt.stream === "assistant" && typeof evt.data?.text === "string") { + const base = { + runId: clientRunId, + sessionKey, + seq: evt.seq, }; - broadcast("chat", payload); - bridgeSendToSession(finishedSessionKey, "chat", payload); - } else { - const payload = { - ...base, - state: "error", - errorMessage: evt.data.error - ? formatForLog(evt.data.error) - : undefined, + chatRunBuffers.set(clientRunId, evt.data.text); + const now = Date.now(); + const last = chatDeltaSentAt.get(clientRunId) ?? 0; + if (now - last >= 150) { + chatDeltaSentAt.set(clientRunId, now); + const payload = { + ...base, + state: "delta" as const, + message: { + role: "assistant", + content: [{ type: "text", text: evt.data.text }], + timestamp: now, + }, + }; + broadcast("chat", payload, { dropIfSlow: true }); + bridgeSendToSession(sessionKey, "chat", payload); + } + } else if (jobState === "done" || jobState === "error") { + const base = { + runId: clientRunId, + sessionKey, + seq: evt.seq, }; - broadcast("chat", payload); - bridgeSendToSession(finishedSessionKey, "chat", payload); + const text = chatRunBuffers.get(clientRunId)?.trim() ?? ""; + chatRunBuffers.delete(clientRunId); + chatDeltaSentAt.delete(clientRunId); + if (jobState === "done") { + const payload = { + ...base, + state: "final", + message: text + ? { + role: "assistant", + content: [{ type: "text", text }], + timestamp: Date.now(), + } + : undefined, + }; + broadcast("chat", payload); + bridgeSendToSession(sessionKey, "chat", payload); + } else { + const payload = { + ...base, + state: "error", + errorMessage: evt.data.error + ? formatForLog(evt.data.error) + : undefined, + }; + broadcast("chat", payload); + bridgeSendToSession(sessionKey, "chat", payload); + } } } } + + if (jobState === "done" || jobState === "error") { + clearAgentRunContext(evt.runId); + } }); const heartbeatUnsub = onHeartbeatEvent((evt) => { @@ -4116,6 +4329,16 @@ export async function startGatewayServer( signalLastProbeAt = Date.now(); } + const imessageCfg = cfg.imessage; + const imessageEnabled = imessageCfg?.enabled !== false; + const imessageConfigured = Boolean(imessageCfg) && imessageEnabled; + let imessageProbe: IMessageProbe | undefined; + let imessageLastProbeAt: number | null = null; + if (probe && imessageConfigured) { + imessageProbe = await probeIMessage(timeoutMs); + imessageLastProbeAt = Date.now(); + } + const linked = await webAuthExists(); const authAgeMs = getWebAuthAgeMs(); const self = readWebSelfId(); @@ -4169,6 +4392,17 @@ export async function startGatewayServer( probe: signalProbe, lastProbeAt: signalLastProbeAt, }, + imessage: { + configured: imessageConfigured, + running: imessageRuntime.running, + lastStartAt: imessageRuntime.lastStartAt ?? null, + lastStopAt: imessageRuntime.lastStopAt ?? null, + lastError: imessageRuntime.lastError ?? null, + cliPath: imessageRuntime.cliPath ?? null, + dbPath: imessageRuntime.dbPath ?? null, + probe: imessageProbe, + lastProbeAt: imessageLastProbeAt, + }, }, undefined, ); @@ -6022,7 +6256,9 @@ export async function startGatewayServer( } const to = params.to.trim(); const message = params.message.trim(); - const provider = (params.provider ?? "whatsapp").toLowerCase(); + const providerRaw = (params.provider ?? "whatsapp").toLowerCase(); + const provider = + providerRaw === "imsg" ? "imessage" : providerRaw; try { if (provider === "telegram") { const cfg = loadConfig(); @@ -6083,6 +6319,27 @@ export async function startGatewayServer( payload, }); respond(true, payload, undefined, { provider }); + } else if (provider === "imessage") { + const cfg = loadConfig(); + const result = await sendMessageIMessage(to, message, { + mediaUrl: params.mediaUrl, + cliPath: cfg.imessage?.cliPath, + dbPath: cfg.imessage?.dbPath, + maxBytes: cfg.imessage?.mediaMaxMb + ? cfg.imessage.mediaMaxMb * 1024 * 1024 + : undefined, + }); + const payload = { + runId: idem, + messageId: result.messageId, + provider, + }; + dedupe.set(`send:${idem}`, { + ts: Date.now(), + ok: true, + payload, + }); + respond(true, payload, undefined, { provider }); } else { const result = await sendMessageWhatsApp(to, message, { mediaUrl: params.mediaUrl, @@ -6197,9 +6454,13 @@ export async function startGatewayServer( const requestedChannelRaw = typeof params.channel === "string" ? params.channel.trim() : ""; - const requestedChannel = requestedChannelRaw + const requestedChannelNormalized = requestedChannelRaw ? requestedChannelRaw.toLowerCase() : "last"; + const requestedChannel = + requestedChannelNormalized === "imsg" + ? "imessage" + : requestedChannelNormalized; const lastChannel = sessionEntry?.lastChannel; const lastTo = @@ -6220,6 +6481,7 @@ export async function startGatewayServer( requestedChannel === "telegram" || requestedChannel === "discord" || requestedChannel === "signal" || + requestedChannel === "imessage" || requestedChannel === "webchat" ) { return requestedChannel; @@ -6239,7 +6501,8 @@ export async function startGatewayServer( resolvedChannel === "whatsapp" || resolvedChannel === "telegram" || resolvedChannel === "discord" || - resolvedChannel === "signal" + resolvedChannel === "signal" || + resolvedChannel === "imessage" ) { return lastTo || undefined; } @@ -6485,6 +6748,7 @@ export async function startGatewayServer( await stopTelegramProvider(); await stopDiscordProvider(); await stopSignalProvider(); + await stopIMessageProvider(); cron.stop(); heartbeatRunner.stop(); broadcast("shutdown", { @@ -6522,7 +6786,9 @@ export async function startGatewayServer( await stopBrowserControlServerIfStarted().catch(() => {}); } await Promise.allSettled( - [whatsappTask, telegramTask, signalTask].filter(Boolean) as Array< + [whatsappTask, telegramTask, signalTask, imessageTask].filter( + Boolean, + ) as Array< Promise >, ); diff --git a/src/infra/agent-events.test.ts b/src/infra/agent-events.test.ts index 50c432c98..b35433d75 100644 --- a/src/infra/agent-events.test.ts +++ b/src/infra/agent-events.test.ts @@ -1,7 +1,22 @@ import { describe, expect, test } from "vitest"; -import { emitAgentEvent, onAgentEvent } from "./agent-events.js"; +import { + emitAgentEvent, + onAgentEvent, + registerAgentRunContext, + getAgentRunContext, + clearAgentRunContext, + resetAgentRunContextForTest, +} from "./agent-events.js"; describe("agent-events sequencing", () => { + test("stores and clears run context", async () => { + resetAgentRunContextForTest(); + registerAgentRunContext("run-1", { sessionKey: "main" }); + expect(getAgentRunContext("run-1")?.sessionKey).toBe("main"); + clearAgentRunContext("run-1"); + expect(getAgentRunContext("run-1")).toBeUndefined(); + }); + test("maintains monotonic seq per runId", async () => { const seen: Record = {}; const stop = onAgentEvent((evt) => { diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 4c581b46d..136d9491c 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -13,9 +13,41 @@ export type AgentEventPayload = { data: Record; }; +export type AgentRunContext = { + sessionKey?: string; +}; + // Keep per-run counters so streams stay strictly monotonic per runId. const seqByRun = new Map(); const listeners = new Set<(evt: AgentEventPayload) => void>(); +const runContextById = new Map(); + +export function registerAgentRunContext( + runId: string, + context: AgentRunContext, +) { + if (!runId) return; + const existing = runContextById.get(runId); + if (!existing) { + runContextById.set(runId, { ...context }); + return; + } + if (context.sessionKey && existing.sessionKey !== context.sessionKey) { + existing.sessionKey = context.sessionKey; + } +} + +export function getAgentRunContext(runId: string) { + return runContextById.get(runId); +} + +export function clearAgentRunContext(runId: string) { + runContextById.delete(runId); +} + +export function resetAgentRunContextForTest() { + runContextById.clear(); +} export function emitAgentEvent(event: Omit) { const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1;