fix(gateway): stream chat events for agent runs

This commit is contained in:
Peter Steinberger
2026-01-02 01:04:59 +01:00
parent 7f3113b8d4
commit c0976ec099
8 changed files with 578 additions and 86 deletions

View File

@@ -32,6 +32,7 @@
### Fixes ### Fixes
- Chat UI: keep the chat scrolled to the latest message after switching sessions. - Chat UI: keep the chat scrolled to the latest message after switching sessions.
- WebChat: stream live updates for sessions even when runs start outside the chat UI.
- Gateway CLI: read `CLAWDIS_GATEWAY_PASSWORD` from environment in `callGateway()` — allows `doctor`/`health` commands to auth without explicit `--password` flag. - Gateway CLI: read `CLAWDIS_GATEWAY_PASSWORD` from environment in `callGateway()` — allows `doctor`/`health` commands to auth without explicit `--password` flag.
- Auto-reply: suppress stray `HEARTBEAT_OK` acks so they never get delivered as messages. - Auto-reply: suppress stray `HEARTBEAT_OK` acks so they never get delivered as messages.
- Discord: include recent guild context when replying to mentions and add `discord.historyLimit` to tune how many messages are captured. - Discord: include recent guild context when replying to mentions and add `discord.historyLimit` to tune how many messages are captured.

View File

@@ -37,6 +37,7 @@ import {
saveSessionStore, saveSessionStore,
} from "../config/sessions.js"; } from "../config/sessions.js";
import { logVerbose } from "../globals.js"; import { logVerbose } from "../globals.js";
import { registerAgentRunContext } from "../infra/agent-events.js";
import { buildProviderSummary } from "../infra/provider-summary.js"; import { buildProviderSummary } from "../infra/provider-summary.js";
import { triggerClawdisRestart } from "../infra/restart.js"; import { triggerClawdisRestart } from "../infra/restart.js";
import { import {
@@ -1196,6 +1197,9 @@ export async function getReplyFromConfig(
await startTypingLoop(); await startTypingLoop();
} }
const runId = crypto.randomUUID(); const runId = crypto.randomUUID();
if (sessionKey) {
registerAgentRunContext(runId, { sessionKey });
}
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>; let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
try { try {
runResult = await runEmbeddedPiAgent({ runResult = await runEmbeddedPiAgent({

View File

@@ -36,7 +36,10 @@ import {
type SessionEntry, type SessionEntry,
saveSessionStore, saveSessionStore,
} from "../config/sessions.js"; } from "../config/sessions.js";
import { emitAgentEvent } from "../infra/agent-events.js"; import {
emitAgentEvent,
registerAgentRunContext,
} from "../infra/agent-events.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { resolveTelegramToken } from "../telegram/token.js"; import { resolveTelegramToken } from "../telegram/token.js";
import { normalizeE164 } from "../utils.js"; import { normalizeE164 } from "../utils.js";
@@ -204,6 +207,10 @@ export async function agentCommand(
} = sessionResolution; } = sessionResolution;
let sessionEntry = resolvedSessionEntry; let sessionEntry = resolvedSessionEntry;
if (sessionKey) {
registerAgentRunContext(sessionId, { sessionKey });
}
const resolvedThinkLevel = const resolvedThinkLevel =
thinkOnce ?? thinkOnce ??
thinkOverride ?? thinkOverride ??
@@ -413,12 +420,15 @@ export async function agentCommand(
const payloads = result.payloads ?? []; const payloads = result.payloads ?? [];
const deliver = opts.deliver === true; const deliver = opts.deliver === true;
const bestEffortDeliver = opts.bestEffortDeliver === true; const bestEffortDeliver = opts.bestEffortDeliver === true;
const deliveryProvider = (opts.provider ?? "whatsapp").toLowerCase(); const deliveryProviderRaw = (opts.provider ?? "whatsapp").toLowerCase();
const deliveryProvider =
deliveryProviderRaw === "imsg" ? "imessage" : deliveryProviderRaw;
const whatsappTarget = opts.to ? normalizeE164(opts.to) : allowFrom[0]; const whatsappTarget = opts.to ? normalizeE164(opts.to) : allowFrom[0];
const telegramTarget = opts.to?.trim() || undefined; const telegramTarget = opts.to?.trim() || undefined;
const discordTarget = opts.to?.trim() || undefined; const discordTarget = opts.to?.trim() || undefined;
const signalTarget = opts.to?.trim() || undefined; const signalTarget = opts.to?.trim() || undefined;
const imessageTarget = opts.to?.trim() || undefined;
const logDeliveryError = (err: unknown) => { const logDeliveryError = (err: unknown) => {
const deliveryTarget = const deliveryTarget =
@@ -428,8 +438,10 @@ export async function agentCommand(
? whatsappTarget ? whatsappTarget
: deliveryProvider === "discord" : deliveryProvider === "discord"
? discordTarget ? discordTarget
: deliveryProvider === "signal" : deliveryProvider === "signal"
? signalTarget ? signalTarget
: deliveryProvider === "imessage"
? imessageTarget
: undefined; : undefined;
const message = `Delivery failed (${deliveryProvider}${deliveryTarget ? ` to ${deliveryTarget}` : ""}): ${String(err)}`; const message = `Delivery failed (${deliveryProvider}${deliveryTarget ? ` to ${deliveryTarget}` : ""}): ${String(err)}`;
runtime.error?.(message); runtime.error?.(message);
@@ -463,6 +475,13 @@ export async function agentCommand(
if (!bestEffortDeliver) throw err; if (!bestEffortDeliver) throw err;
logDeliveryError(err); logDeliveryError(err);
} }
if (deliveryProvider === "imessage" && !imessageTarget) {
const err = new Error(
"Delivering to iMessage requires --to <handle|chat_id:ID>",
);
if (!bestEffortDeliver) throw err;
logDeliveryError(err);
}
if (deliveryProvider === "webchat") { if (deliveryProvider === "webchat") {
const err = new Error( const err = new Error(
"Delivering to WebChat is not supported via `clawdis agent`; use WhatsApp/Telegram or run with --deliver=false.", "Delivering to WebChat is not supported via `clawdis agent`; use WhatsApp/Telegram or run with --deliver=false.",
@@ -475,6 +494,7 @@ export async function agentCommand(
deliveryProvider !== "telegram" && deliveryProvider !== "telegram" &&
deliveryProvider !== "discord" && deliveryProvider !== "discord" &&
deliveryProvider !== "signal" && deliveryProvider !== "signal" &&
deliveryProvider !== "imessage" &&
deliveryProvider !== "webchat" deliveryProvider !== "webchat"
) { ) {
const err = new Error(`Unknown provider: ${deliveryProvider}`); const err = new Error(`Unknown provider: ${deliveryProvider}`);
@@ -621,5 +641,38 @@ export async function agentCommand(
logDeliveryError(err); logDeliveryError(err);
} }
} }
if (deliveryProvider === "imessage" && imessageTarget) {
try {
if (media.length === 0) {
for (const chunk of chunkText(text, 4000)) {
await deps.sendMessageIMessage(imessageTarget, chunk, {
maxBytes: cfg.imessage?.mediaMaxMb
? cfg.imessage.mediaMaxMb * 1024 * 1024
: cfg.agent?.mediaMaxMb
? cfg.agent.mediaMaxMb * 1024 * 1024
: undefined,
});
}
} else {
let first = true;
for (const url of media) {
const caption = first ? text : "";
first = false;
await deps.sendMessageIMessage(imessageTarget, caption, {
mediaUrl: url,
maxBytes: cfg.imessage?.mediaMaxMb
? cfg.imessage.mediaMaxMb * 1024 * 1024
: cfg.agent?.mediaMaxMb
? cfg.agent.mediaMaxMb * 1024 * 1024
: undefined,
});
}
}
} catch (err) {
if (!bestEffortDeliver) throw err;
logDeliveryError(err);
}
}
} }
} }

View File

@@ -24,6 +24,7 @@ import {
type SessionEntry, type SessionEntry,
saveSessionStore, saveSessionStore,
} from "../config/sessions.js"; } from "../config/sessions.js";
import { registerAgentRunContext } from "../infra/agent-events.js";
import { resolveTelegramToken } from "../telegram/token.js"; import { resolveTelegramToken } from "../telegram/token.js";
import { normalizeE164 } from "../utils.js"; import { normalizeE164 } from "../utils.js";
import type { CronJob } from "./types.js"; import type { CronJob } from "./types.js";
@@ -54,7 +55,13 @@ function pickSummaryFromPayloads(
function resolveDeliveryTarget( function resolveDeliveryTarget(
cfg: ClawdisConfig, cfg: ClawdisConfig,
jobPayload: { jobPayload: {
channel?: "last" | "whatsapp" | "telegram" | "discord" | "signal"; channel?:
| "last"
| "whatsapp"
| "telegram"
| "discord"
| "signal"
| "imessage";
to?: string; to?: string;
}, },
) { ) {
@@ -81,7 +88,8 @@ function resolveDeliveryTarget(
requestedChannel === "whatsapp" || requestedChannel === "whatsapp" ||
requestedChannel === "telegram" || requestedChannel === "telegram" ||
requestedChannel === "discord" || requestedChannel === "discord" ||
requestedChannel === "signal" requestedChannel === "signal" ||
requestedChannel === "imessage"
) { ) {
return requestedChannel; return requestedChannel;
} }
@@ -244,6 +252,9 @@ export async function runCronIsolatedAgentTurn(params: {
const sessionFile = resolveSessionTranscriptPath( const sessionFile = resolveSessionTranscriptPath(
cronSession.sessionEntry.sessionId, cronSession.sessionEntry.sessionId,
); );
registerAgentRunContext(cronSession.sessionEntry.sessionId, {
sessionKey: params.sessionKey,
});
runResult = await runEmbeddedPiAgent({ runResult = await runEmbeddedPiAgent({
sessionId: cronSession.sessionEntry.sessionId, sessionId: cronSession.sessionEntry.sessionId,
sessionKey: params.sessionKey, sessionKey: params.sessionKey,
@@ -457,6 +468,44 @@ export async function runCronIsolatedAgentTurn(params: {
return { status: "error", summary, error: String(err) }; return { status: "error", summary, error: String(err) };
return { status: "ok", summary }; return { status: "ok", summary };
} }
} else if (resolvedDelivery.channel === "imessage") {
if (!resolvedDelivery.to) {
if (!bestEffortDeliver)
return {
status: "error",
summary,
error: "Cron delivery to iMessage requires a recipient.",
};
return {
status: "skipped",
summary: "Delivery skipped (no iMessage recipient).",
};
}
const to = resolvedDelivery.to;
try {
for (const payload of payloads) {
const mediaList =
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
if (mediaList.length === 0) {
for (const chunk of chunkText(payload.text ?? "", 4000)) {
await params.deps.sendMessageIMessage(to, chunk);
}
} else {
let first = true;
for (const url of mediaList) {
const caption = first ? (payload.text ?? "") : "";
first = false;
await params.deps.sendMessageIMessage(to, caption, {
mediaUrl: url,
});
}
}
}
} catch (err) {
if (!bestEffortDeliver)
return { status: "error", summary, error: String(err) };
return { status: "ok", summary };
}
} }
} }

View File

@@ -12,7 +12,11 @@ import {
STATE_DIR_CLAWDIS, STATE_DIR_CLAWDIS,
writeConfigFile, writeConfigFile,
} from "../config/config.js"; } from "../config/config.js";
import { emitAgentEvent } from "../infra/agent-events.js"; import {
emitAgentEvent,
registerAgentRunContext,
resetAgentRunContextForTest,
} from "../infra/agent-events.js";
import { GatewayLockError } from "../infra/gateway-lock.js"; import { GatewayLockError } from "../infra/gateway-lock.js";
import { emitHeartbeatEvent } from "../infra/heartbeat-events.js"; import { emitHeartbeatEvent } from "../infra/heartbeat-events.js";
import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js"; import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js";
@@ -277,6 +281,7 @@ beforeEach(async () => {
testCanvasHostPort = undefined; testCanvasHostPort = undefined;
cronIsolatedRun.mockClear(); cronIsolatedRun.mockClear();
drainSystemEvents(); drainSystemEvents();
resetAgentRunContextForTest();
__resetModelCatalogCacheForTest(); __resetModelCatalogCacheForTest();
piSdkMock.enabled = false; piSdkMock.enabled = false;
piSdkMock.discoverCalls = 0; piSdkMock.discoverCalls = 0;
@@ -3621,6 +3626,73 @@ describe("gateway server", () => {
await server.close(); await server.close();
}); });
test("agent events stream to webchat clients when run context is registered", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
testSessionStorePath = path.join(dir, "sessions.json");
await fs.writeFile(
testSessionStorePath,
JSON.stringify(
{
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
},
},
null,
2,
),
"utf-8",
);
const { server, ws } = await startServerWithClient();
await connectOk(ws, {
client: {
name: "webchat",
version: "1.0.0",
platform: "test",
mode: "webchat",
},
});
registerAgentRunContext("run-auto-1", { sessionKey: "main" });
const finalChatP = onceMessage<{
type: "event";
event: string;
payload?: unknown;
}>(
ws,
(o) => {
if (o.type !== "event" || o.event !== "chat") return false;
const payload = o.payload as { state?: unknown; runId?: unknown } | undefined;
return payload?.state === "final" && payload.runId === "run-auto-1";
},
8000,
);
emitAgentEvent({
runId: "run-auto-1",
stream: "assistant",
data: { text: "hi from agent" },
});
emitAgentEvent({
runId: "run-auto-1",
stream: "job",
data: { state: "done" },
});
const evt = await finalChatP;
const payload =
evt.payload && typeof evt.payload === "object"
? (evt.payload as Record<string, unknown>)
: {};
expect(payload.sessionKey).toBe("main");
expect(payload.runId).toBe("run-auto-1");
ws.close();
await server.close();
});
test("bridge chat.abort cancels while saving the session store", async () => { test("bridge chat.abort cancels while saving the session store", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
testSessionStorePath = path.join(dir, "sessions.json"); testSessionStorePath = path.join(dir, "sessions.json");

View File

@@ -74,8 +74,18 @@ import {
sendMessageDiscord, sendMessageDiscord,
} from "../discord/index.js"; } from "../discord/index.js";
import { type DiscordProbe, probeDiscord } from "../discord/probe.js"; import { type DiscordProbe, probeDiscord } from "../discord/probe.js";
import {
monitorIMessageProvider,
sendMessageIMessage,
} from "../imessage/index.js";
import { probeIMessage, type IMessageProbe } from "../imessage/probe.js";
import { isVerbose } from "../globals.js"; import { isVerbose } from "../globals.js";
import { onAgentEvent } from "../infra/agent-events.js"; import {
clearAgentRunContext,
getAgentRunContext,
onAgentEvent,
registerAgentRunContext,
} from "../infra/agent-events.js";
import { startGatewayBonjourAdvertiser } from "../infra/bonjour.js"; import { startGatewayBonjourAdvertiser } from "../infra/bonjour.js";
import { startNodeBridgeServer } from "../infra/bridge/server.js"; import { startNodeBridgeServer } from "../infra/bridge/server.js";
import { resolveCanvasHostUrl } from "../infra/canvas-host-url.js"; import { resolveCanvasHostUrl } from "../infra/canvas-host-url.js";
@@ -287,11 +297,13 @@ const logWhatsApp = logProviders.child("whatsapp");
const logTelegram = logProviders.child("telegram"); const logTelegram = logProviders.child("telegram");
const logDiscord = logProviders.child("discord"); const logDiscord = logProviders.child("discord");
const logSignal = logProviders.child("signal"); const logSignal = logProviders.child("signal");
const logIMessage = logProviders.child("imessage");
const canvasRuntime = runtimeForLogger(logCanvas); const canvasRuntime = runtimeForLogger(logCanvas);
const whatsappRuntimeEnv = runtimeForLogger(logWhatsApp); const whatsappRuntimeEnv = runtimeForLogger(logWhatsApp);
const telegramRuntimeEnv = runtimeForLogger(logTelegram); const telegramRuntimeEnv = runtimeForLogger(logTelegram);
const discordRuntimeEnv = runtimeForLogger(logDiscord); const discordRuntimeEnv = runtimeForLogger(logDiscord);
const signalRuntimeEnv = runtimeForLogger(logSignal); const signalRuntimeEnv = runtimeForLogger(logSignal);
const imessageRuntimeEnv = runtimeForLogger(logIMessage);
function resolveBonjourCliPath(): string | undefined { function resolveBonjourCliPath(): string | undefined {
const envPath = process.env.CLAWDIS_CLI_PATH?.trim(); const envPath = process.env.CLAWDIS_CLI_PATH?.trim();
@@ -1345,7 +1357,13 @@ export async function startGatewayServer(
wakeMode: "now" | "next-heartbeat"; wakeMode: "now" | "next-heartbeat";
sessionKey: string; sessionKey: string;
deliver: boolean; deliver: boolean;
channel: "last" | "whatsapp" | "telegram" | "discord" | "signal"; channel:
| "last"
| "whatsapp"
| "telegram"
| "discord"
| "signal"
| "imessage";
to?: string; to?: string;
thinking?: string; thinking?: string;
timeoutSeconds?: number; timeoutSeconds?: number;
@@ -1371,15 +1389,19 @@ export async function startGatewayServer(
channelRaw === "telegram" || channelRaw === "telegram" ||
channelRaw === "discord" || channelRaw === "discord" ||
channelRaw === "signal" || channelRaw === "signal" ||
channelRaw === "imessage" ||
channelRaw === "last" channelRaw === "last"
? channelRaw ? channelRaw
: channelRaw === "imsg"
? "imessage"
: channelRaw === undefined : channelRaw === undefined
? "last" ? "last"
: null; : null;
if (channel === null) { if (channel === null) {
return { return {
ok: false, ok: false,
error: "channel must be last|whatsapp|telegram|discord|signal", error:
"channel must be last|whatsapp|telegram|discord|signal|imessage",
}; };
} }
const toRaw = payload.to; const toRaw = payload.to;
@@ -1430,7 +1452,13 @@ export async function startGatewayServer(
wakeMode: "now" | "next-heartbeat"; wakeMode: "now" | "next-heartbeat";
sessionKey: string; sessionKey: string;
deliver: boolean; deliver: boolean;
channel: "last" | "whatsapp" | "telegram" | "discord" | "signal"; channel:
| "last"
| "whatsapp"
| "telegram"
| "discord"
| "signal"
| "imessage";
to?: string; to?: string;
thinking?: string; thinking?: string;
timeoutSeconds?: number; timeoutSeconds?: number;
@@ -1714,10 +1742,12 @@ export async function startGatewayServer(
let telegramAbort: AbortController | null = null; let telegramAbort: AbortController | null = null;
let discordAbort: AbortController | null = null; let discordAbort: AbortController | null = null;
let signalAbort: AbortController | null = null; let signalAbort: AbortController | null = null;
let imessageAbort: AbortController | null = null;
let whatsappTask: Promise<unknown> | null = null; let whatsappTask: Promise<unknown> | null = null;
let telegramTask: Promise<unknown> | null = null; let telegramTask: Promise<unknown> | null = null;
let discordTask: Promise<unknown> | null = null; let discordTask: Promise<unknown> | null = null;
let signalTask: Promise<unknown> | null = null; let signalTask: Promise<unknown> | null = null;
let imessageTask: Promise<unknown> | null = null;
let whatsappRuntime: WebProviderStatus = { let whatsappRuntime: WebProviderStatus = {
running: false, running: false,
connected: false, connected: false,
@@ -1765,12 +1795,27 @@ export async function startGatewayServer(
lastError: null, lastError: null,
baseUrl: null, baseUrl: null,
}; };
let imessageRuntime: {
running: boolean;
lastStartAt?: number | null;
lastStopAt?: number | null;
lastError?: string | null;
cliPath?: string | null;
dbPath?: string | null;
} = {
running: false,
lastStartAt: null,
lastStopAt: null,
lastError: null,
cliPath: null,
dbPath: null,
};
const clients = new Set<Client>(); const clients = new Set<Client>();
let seq = 0; let seq = 0;
// Track per-run sequence to detect out-of-order/lost agent events. // Track per-run sequence to detect out-of-order/lost agent events.
const agentRunSeq = new Map<string, number>(); const agentRunSeq = new Map<string, number>();
const dedupe = new Map<string, DedupeEntry>(); const dedupe = new Map<string, DedupeEntry>();
// Map agent sessionId -> pending chat runs for WebChat clients. // Map agent runId -> pending chat runs for WebChat clients.
const chatRunSessions = new Map< const chatRunSessions = new Map<
string, string,
Array<{ sessionKey: string; clientRunId: string }> Array<{ sessionKey: string; clientRunId: string }>
@@ -1812,6 +1857,21 @@ export async function startGatewayServer(
if (!queue.length) chatRunSessions.delete(sessionId); if (!queue.length) chatRunSessions.delete(sessionId);
return entry; return entry;
}; };
const resolveSessionKeyForRun = (runId: string) => {
const cached = getAgentRunContext(runId)?.sessionKey;
if (cached) return cached;
const cfg = loadConfig();
const storePath = resolveStorePath(cfg.session?.store);
const store = loadSessionStore(storePath);
const found = Object.entries(store).find(
([, entry]) => entry?.sessionId === runId,
);
const sessionKey = found?.[0];
if (sessionKey) {
registerAgentRunContext(runId, { sessionKey });
}
return sessionKey;
};
const chatRunBuffers = new Map<string, string>(); const chatRunBuffers = new Map<string, string>();
const chatDeltaSentAt = new Map<string, number>(); const chatDeltaSentAt = new Map<string, number>();
const chatAbortControllers = new Map< const chatAbortControllers = new Map<
@@ -2221,11 +2281,92 @@ export async function startGatewayServer(
}; };
}; };
const startIMessageProvider = async () => {
if (imessageTask) return;
const cfg = loadConfig();
if (!cfg.imessage) {
imessageRuntime = {
...imessageRuntime,
running: false,
lastError: "not configured",
};
logIMessage.info("skipping provider start (imessage not configured)");
return;
}
if (cfg.imessage?.enabled === false) {
imessageRuntime = {
...imessageRuntime,
running: false,
lastError: "disabled",
};
logIMessage.info("skipping provider start (imessage.enabled=false)");
return;
}
const cliPath = cfg.imessage?.cliPath?.trim() || "imsg";
const dbPath = cfg.imessage?.dbPath?.trim();
logIMessage.info(
`starting provider (${cliPath}${dbPath ? ` db=${dbPath}` : ""})`,
);
imessageAbort = new AbortController();
imessageRuntime = {
...imessageRuntime,
running: true,
lastStartAt: Date.now(),
lastError: null,
cliPath,
dbPath: dbPath ?? null,
};
const task = monitorIMessageProvider({
cliPath,
dbPath,
allowFrom: cfg.imessage?.allowFrom,
includeAttachments: cfg.imessage?.includeAttachments,
mediaMaxMb: cfg.imessage?.mediaMaxMb,
runtime: imessageRuntimeEnv,
abortSignal: imessageAbort.signal,
})
.catch((err) => {
imessageRuntime = {
...imessageRuntime,
lastError: formatError(err),
};
logIMessage.error(`provider exited: ${formatError(err)}`);
})
.finally(() => {
imessageAbort = null;
imessageTask = null;
imessageRuntime = {
...imessageRuntime,
running: false,
lastStopAt: Date.now(),
};
});
imessageTask = task;
};
const stopIMessageProvider = async () => {
if (!imessageAbort && !imessageTask) return;
imessageAbort?.abort();
try {
await imessageTask;
} catch {
// ignore
}
imessageAbort = null;
imessageTask = null;
imessageRuntime = {
...imessageRuntime,
running: false,
lastStopAt: Date.now(),
};
};
const startProviders = async () => { const startProviders = async () => {
await startWhatsAppProvider(); await startWhatsAppProvider();
await startDiscordProvider(); await startDiscordProvider();
await startTelegramProvider(); await startTelegramProvider();
await startSignalProvider(); await startSignalProvider();
await startIMessageProvider();
}; };
const broadcast = ( const broadcast = (
@@ -3269,7 +3410,8 @@ export async function startGatewayServer(
const provider = const provider =
channel === "whatsapp" || channel === "whatsapp" ||
channel === "telegram" || channel === "telegram" ||
channel === "signal" channel === "signal" ||
channel === "imessage"
? channel ? channel
: undefined; : undefined;
const to = const to =
@@ -3586,81 +3728,152 @@ export async function startGatewayServer(
broadcast("agent", evt); broadcast("agent", evt);
const chatLink = peekChatRun(evt.runId); const chatLink = peekChatRun(evt.runId);
if (chatLink) { const sessionKey =
// Map agent bus events to chat events for WS WebChat clients. chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId);
// Use clientRunId so the webchat can correlate with its pending promise. const jobState =
const { sessionKey, clientRunId } = chatLink; evt.stream === "job" && typeof evt.data?.state === "string"
bridgeSendToSession(sessionKey, "agent", evt); ? evt.data.state
if (evt.stream === "assistant" && typeof evt.data?.text === "string") { : null;
const base = {
runId: clientRunId, if (sessionKey) {
sessionKey, if (chatLink) {
seq: evt.seq, // Map agent bus events to chat events for WS WebChat clients.
}; // Use clientRunId so the webchat can correlate with its pending promise.
chatRunBuffers.set(clientRunId, evt.data.text); const { clientRunId } = chatLink;
const now = Date.now(); bridgeSendToSession(sessionKey, "agent", evt);
const last = chatDeltaSentAt.get(clientRunId) ?? 0; if (evt.stream === "assistant" && typeof evt.data?.text === "string") {
// Throttle UI delta events so slow clients don't accumulate unbounded buffers. const base = {
if (now - last >= 150) { runId: clientRunId,
chatDeltaSentAt.set(clientRunId, now); sessionKey,
const payload = { seq: evt.seq,
...base,
state: "delta" as const,
message: {
role: "assistant",
content: [{ type: "text", text: evt.data.text }],
timestamp: now,
},
}; };
broadcast("chat", payload, { dropIfSlow: true }); chatRunBuffers.set(clientRunId, evt.data.text);
bridgeSendToSession(sessionKey, "chat", payload); const now = Date.now();
const last = chatDeltaSentAt.get(clientRunId) ?? 0;
// Throttle UI delta events so slow clients don't accumulate unbounded buffers.
if (now - last >= 150) {
chatDeltaSentAt.set(clientRunId, now);
const payload = {
...base,
state: "delta" as const,
message: {
role: "assistant",
content: [{ type: "text", text: evt.data.text }],
timestamp: now,
},
};
broadcast("chat", payload, { dropIfSlow: true });
bridgeSendToSession(sessionKey, "chat", payload);
}
} else if (jobState === "done" || jobState === "error") {
const finished = shiftChatRun(evt.runId);
if (!finished) {
if (jobState) clearAgentRunContext(evt.runId);
return;
}
const { sessionKey: finishedSessionKey, clientRunId: finishedRunId } =
finished;
const base = {
runId: finishedRunId,
sessionKey: finishedSessionKey,
seq: evt.seq,
};
const text = chatRunBuffers.get(finishedRunId)?.trim() ?? "";
chatRunBuffers.delete(finishedRunId);
chatDeltaSentAt.delete(finishedRunId);
if (jobState === "done") {
const payload = {
...base,
state: "final",
message: text
? {
role: "assistant",
content: [{ type: "text", text }],
timestamp: Date.now(),
}
: undefined,
};
broadcast("chat", payload);
bridgeSendToSession(finishedSessionKey, "chat", payload);
} else {
const payload = {
...base,
state: "error",
errorMessage: evt.data.error
? formatForLog(evt.data.error)
: undefined,
};
broadcast("chat", payload);
bridgeSendToSession(finishedSessionKey, "chat", payload);
}
} }
} else if ( } else {
evt.stream === "job" && const clientRunId = evt.runId;
typeof evt.data?.state === "string" && bridgeSendToSession(sessionKey, "agent", evt);
(evt.data.state === "done" || evt.data.state === "error") if (evt.stream === "assistant" && typeof evt.data?.text === "string") {
) { const base = {
const finished = shiftChatRun(evt.runId); runId: clientRunId,
if (!finished) { sessionKey,
return; seq: evt.seq,
}
const { sessionKey: finishedSessionKey, clientRunId: finishedRunId } =
finished;
const base = {
runId: finishedRunId,
sessionKey: finishedSessionKey,
seq: evt.seq,
};
const text = chatRunBuffers.get(finishedRunId)?.trim() ?? "";
chatRunBuffers.delete(finishedRunId);
chatDeltaSentAt.delete(finishedRunId);
if (evt.data.state === "done") {
const payload = {
...base,
state: "final",
message: text
? {
role: "assistant",
content: [{ type: "text", text }],
timestamp: Date.now(),
}
: undefined,
}; };
broadcast("chat", payload); chatRunBuffers.set(clientRunId, evt.data.text);
bridgeSendToSession(finishedSessionKey, "chat", payload); const now = Date.now();
} else { const last = chatDeltaSentAt.get(clientRunId) ?? 0;
const payload = { if (now - last >= 150) {
...base, chatDeltaSentAt.set(clientRunId, now);
state: "error", const payload = {
errorMessage: evt.data.error ...base,
? formatForLog(evt.data.error) state: "delta" as const,
: undefined, message: {
role: "assistant",
content: [{ type: "text", text: evt.data.text }],
timestamp: now,
},
};
broadcast("chat", payload, { dropIfSlow: true });
bridgeSendToSession(sessionKey, "chat", payload);
}
} else if (jobState === "done" || jobState === "error") {
const base = {
runId: clientRunId,
sessionKey,
seq: evt.seq,
}; };
broadcast("chat", payload); const text = chatRunBuffers.get(clientRunId)?.trim() ?? "";
bridgeSendToSession(finishedSessionKey, "chat", payload); chatRunBuffers.delete(clientRunId);
chatDeltaSentAt.delete(clientRunId);
if (jobState === "done") {
const payload = {
...base,
state: "final",
message: text
? {
role: "assistant",
content: [{ type: "text", text }],
timestamp: Date.now(),
}
: undefined,
};
broadcast("chat", payload);
bridgeSendToSession(sessionKey, "chat", payload);
} else {
const payload = {
...base,
state: "error",
errorMessage: evt.data.error
? formatForLog(evt.data.error)
: undefined,
};
broadcast("chat", payload);
bridgeSendToSession(sessionKey, "chat", payload);
}
} }
} }
} }
if (jobState === "done" || jobState === "error") {
clearAgentRunContext(evt.runId);
}
}); });
const heartbeatUnsub = onHeartbeatEvent((evt) => { const heartbeatUnsub = onHeartbeatEvent((evt) => {
@@ -4116,6 +4329,16 @@ export async function startGatewayServer(
signalLastProbeAt = Date.now(); signalLastProbeAt = Date.now();
} }
const imessageCfg = cfg.imessage;
const imessageEnabled = imessageCfg?.enabled !== false;
const imessageConfigured = Boolean(imessageCfg) && imessageEnabled;
let imessageProbe: IMessageProbe | undefined;
let imessageLastProbeAt: number | null = null;
if (probe && imessageConfigured) {
imessageProbe = await probeIMessage(timeoutMs);
imessageLastProbeAt = Date.now();
}
const linked = await webAuthExists(); const linked = await webAuthExists();
const authAgeMs = getWebAuthAgeMs(); const authAgeMs = getWebAuthAgeMs();
const self = readWebSelfId(); const self = readWebSelfId();
@@ -4169,6 +4392,17 @@ export async function startGatewayServer(
probe: signalProbe, probe: signalProbe,
lastProbeAt: signalLastProbeAt, lastProbeAt: signalLastProbeAt,
}, },
imessage: {
configured: imessageConfigured,
running: imessageRuntime.running,
lastStartAt: imessageRuntime.lastStartAt ?? null,
lastStopAt: imessageRuntime.lastStopAt ?? null,
lastError: imessageRuntime.lastError ?? null,
cliPath: imessageRuntime.cliPath ?? null,
dbPath: imessageRuntime.dbPath ?? null,
probe: imessageProbe,
lastProbeAt: imessageLastProbeAt,
},
}, },
undefined, undefined,
); );
@@ -6022,7 +6256,9 @@ export async function startGatewayServer(
} }
const to = params.to.trim(); const to = params.to.trim();
const message = params.message.trim(); const message = params.message.trim();
const provider = (params.provider ?? "whatsapp").toLowerCase(); const providerRaw = (params.provider ?? "whatsapp").toLowerCase();
const provider =
providerRaw === "imsg" ? "imessage" : providerRaw;
try { try {
if (provider === "telegram") { if (provider === "telegram") {
const cfg = loadConfig(); const cfg = loadConfig();
@@ -6083,6 +6319,27 @@ export async function startGatewayServer(
payload, payload,
}); });
respond(true, payload, undefined, { provider }); respond(true, payload, undefined, { provider });
} else if (provider === "imessage") {
const cfg = loadConfig();
const result = await sendMessageIMessage(to, message, {
mediaUrl: params.mediaUrl,
cliPath: cfg.imessage?.cliPath,
dbPath: cfg.imessage?.dbPath,
maxBytes: cfg.imessage?.mediaMaxMb
? cfg.imessage.mediaMaxMb * 1024 * 1024
: undefined,
});
const payload = {
runId: idem,
messageId: result.messageId,
provider,
};
dedupe.set(`send:${idem}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { provider });
} else { } else {
const result = await sendMessageWhatsApp(to, message, { const result = await sendMessageWhatsApp(to, message, {
mediaUrl: params.mediaUrl, mediaUrl: params.mediaUrl,
@@ -6197,9 +6454,13 @@ export async function startGatewayServer(
const requestedChannelRaw = const requestedChannelRaw =
typeof params.channel === "string" ? params.channel.trim() : ""; typeof params.channel === "string" ? params.channel.trim() : "";
const requestedChannel = requestedChannelRaw const requestedChannelNormalized = requestedChannelRaw
? requestedChannelRaw.toLowerCase() ? requestedChannelRaw.toLowerCase()
: "last"; : "last";
const requestedChannel =
requestedChannelNormalized === "imsg"
? "imessage"
: requestedChannelNormalized;
const lastChannel = sessionEntry?.lastChannel; const lastChannel = sessionEntry?.lastChannel;
const lastTo = const lastTo =
@@ -6220,6 +6481,7 @@ export async function startGatewayServer(
requestedChannel === "telegram" || requestedChannel === "telegram" ||
requestedChannel === "discord" || requestedChannel === "discord" ||
requestedChannel === "signal" || requestedChannel === "signal" ||
requestedChannel === "imessage" ||
requestedChannel === "webchat" requestedChannel === "webchat"
) { ) {
return requestedChannel; return requestedChannel;
@@ -6239,7 +6501,8 @@ export async function startGatewayServer(
resolvedChannel === "whatsapp" || resolvedChannel === "whatsapp" ||
resolvedChannel === "telegram" || resolvedChannel === "telegram" ||
resolvedChannel === "discord" || resolvedChannel === "discord" ||
resolvedChannel === "signal" resolvedChannel === "signal" ||
resolvedChannel === "imessage"
) { ) {
return lastTo || undefined; return lastTo || undefined;
} }
@@ -6485,6 +6748,7 @@ export async function startGatewayServer(
await stopTelegramProvider(); await stopTelegramProvider();
await stopDiscordProvider(); await stopDiscordProvider();
await stopSignalProvider(); await stopSignalProvider();
await stopIMessageProvider();
cron.stop(); cron.stop();
heartbeatRunner.stop(); heartbeatRunner.stop();
broadcast("shutdown", { broadcast("shutdown", {
@@ -6522,7 +6786,9 @@ export async function startGatewayServer(
await stopBrowserControlServerIfStarted().catch(() => {}); await stopBrowserControlServerIfStarted().catch(() => {});
} }
await Promise.allSettled( await Promise.allSettled(
[whatsappTask, telegramTask, signalTask].filter(Boolean) as Array< [whatsappTask, telegramTask, signalTask, imessageTask].filter(
Boolean,
) as Array<
Promise<unknown> Promise<unknown>
>, >,
); );

View File

@@ -1,7 +1,22 @@
import { describe, expect, test } from "vitest"; import { describe, expect, test } from "vitest";
import { emitAgentEvent, onAgentEvent } from "./agent-events.js"; import {
emitAgentEvent,
onAgentEvent,
registerAgentRunContext,
getAgentRunContext,
clearAgentRunContext,
resetAgentRunContextForTest,
} from "./agent-events.js";
describe("agent-events sequencing", () => { describe("agent-events sequencing", () => {
test("stores and clears run context", async () => {
resetAgentRunContextForTest();
registerAgentRunContext("run-1", { sessionKey: "main" });
expect(getAgentRunContext("run-1")?.sessionKey).toBe("main");
clearAgentRunContext("run-1");
expect(getAgentRunContext("run-1")).toBeUndefined();
});
test("maintains monotonic seq per runId", async () => { test("maintains monotonic seq per runId", async () => {
const seen: Record<string, number[]> = {}; const seen: Record<string, number[]> = {};
const stop = onAgentEvent((evt) => { const stop = onAgentEvent((evt) => {

View File

@@ -13,9 +13,41 @@ export type AgentEventPayload = {
data: Record<string, unknown>; data: Record<string, unknown>;
}; };
export type AgentRunContext = {
sessionKey?: string;
};
// Keep per-run counters so streams stay strictly monotonic per runId. // Keep per-run counters so streams stay strictly monotonic per runId.
const seqByRun = new Map<string, number>(); const seqByRun = new Map<string, number>();
const listeners = new Set<(evt: AgentEventPayload) => void>(); const listeners = new Set<(evt: AgentEventPayload) => void>();
const runContextById = new Map<string, AgentRunContext>();
export function registerAgentRunContext(
runId: string,
context: AgentRunContext,
) {
if (!runId) return;
const existing = runContextById.get(runId);
if (!existing) {
runContextById.set(runId, { ...context });
return;
}
if (context.sessionKey && existing.sessionKey !== context.sessionKey) {
existing.sessionKey = context.sessionKey;
}
}
export function getAgentRunContext(runId: string) {
return runContextById.get(runId);
}
export function clearAgentRunContext(runId: string) {
runContextById.delete(runId);
}
export function resetAgentRunContextForTest() {
runContextById.clear();
}
export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) { export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1; const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1;