From a7d33c06f9049ed4b5538779ac3f207148c2793c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 5 Jan 2026 05:55:02 +0100 Subject: [PATCH] refactor: align agent lifecycle --- CHANGELOG.md | 1 + .../ClawdbotProtocol/GatewayModels.swift | 4 - docs/agent-loop.md | 61 ++++++++++ docs/refactor/agent-loop.md | 65 ++++++++++ src/agents/clawdbot-tools.sessions.test.ts | 6 - src/agents/pi-embedded-subscribe.ts | 24 ++++ src/agents/tools/sessions-send-tool.ts | 19 +-- src/canvas-host/a2ui/.bundle.hash | 4 +- src/commands/agent.ts | 69 +++++------ src/gateway/protocol/schema.ts | 1 - src/gateway/server-chat.ts | 14 +-- src/gateway/server-methods/agent-job.ts | 92 +++++++------- src/gateway/server-methods/agent.ts | 7 +- src/gateway/server.agent.test.ts | 112 +++++++++++------- src/gateway/server.chat.test.ts | 8 +- src/gateway/server.health.test.ts | 4 +- src/gateway/server.node-bridge.test.ts | 8 +- src/gateway/server.sessions-send.test.ts | 11 +- src/gateway/ws-log.ts | 10 +- src/infra/agent-events.test.ts | 8 +- src/infra/agent-events.ts | 2 +- src/tui/tui.ts | 10 +- 22 files changed, 332 insertions(+), 208 deletions(-) create mode 100644 docs/agent-loop.md create mode 100644 docs/refactor/agent-loop.md diff --git a/CHANGELOG.md b/CHANGELOG.md index bb0d1e345..d7837582f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - TUI: migrate key handling to the updated pi-tui Key matcher API. - macOS: local gateway now connects via tailnet IP when bind mode is `tailnet`/`auto`. - macOS: Settings now use a sidebar layout to avoid toolbar overflow in Connections. +- macOS: drop deprecated `afterMs` from agent wait params to match gateway schema. ### Maintenance - Deps: bump pi-* stack, Slack SDK, discord-api-types, file-type, zod, and Biome. diff --git a/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift b/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift index 65a629441..49690fbe3 100644 --- a/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift +++ b/apps/macos/Sources/ClawdbotProtocol/GatewayModels.swift @@ -424,21 +424,17 @@ public struct AgentParams: Codable, Sendable { public struct AgentWaitParams: Codable, Sendable { public let runid: String - public let afterms: Int? public let timeoutms: Int? public init( runid: String, - afterms: Int?, timeoutms: Int? ) { self.runid = runid - self.afterms = afterms self.timeoutms = timeoutms } private enum CodingKeys: String, CodingKey { case runid = "runId" - case afterms = "afterMs" case timeoutms = "timeoutMs" } } diff --git a/docs/agent-loop.md b/docs/agent-loop.md new file mode 100644 index 000000000..69bfe2a24 --- /dev/null +++ b/docs/agent-loop.md @@ -0,0 +1,61 @@ +--- +summary: "Agent loop lifecycle, streams, and wait semantics" +read_when: + - You need an exact walkthrough of the agent loop or lifecycle events +--- +# Agent Loop (Clawdis) + +Short, exact flow of one agent run. Source of truth: current code in `src/`. + +## Entry points +- Gateway RPC: `agent` and `agent.wait` in `src/gateway/server-methods/agent.ts`. +- CLI: `agentCommand` in `src/commands/agent.ts`. + +## High-level flow +1) `agent` RPC validates params, resolves session (sessionKey/sessionId), persists session metadata, returns `{ runId, acceptedAt }` immediately. +2) `agentCommand` runs the agent: + - resolves model + thinking/verbose defaults + - loads skills snapshot + - calls `runEmbeddedPiAgent` (pi-agent-core runtime) + - emits **lifecycle end/error** if the embedded loop does not emit one +3) `runEmbeddedPiAgent`: + - builds `AgentSession` and subscribes to pi events + - streams assistant deltas + tool events + - enforces timeout -> aborts run if exceeded + - returns payloads + usage metadata +4) `subscribeEmbeddedPiSession` bridges pi-agent-core events to Clawdis `agent` stream: + - tool events => `stream: "tool"` + - assistant deltas => `stream: "assistant"` + - lifecycle events => `stream: "lifecycle"` (`phase: "start" | "end" | "error"`) +5) `agent.wait` uses `waitForAgentJob`: + - waits for **lifecycle end/error** for `runId` + - returns `{ status: ok|error|timeout, startedAt, endedAt, error? }` + +## Event streams (today) +- `lifecycle`: emitted by `subscribeEmbeddedPiSession` (and as a fallback by `agentCommand`) +- `assistant`: streamed deltas from pi-agent-core +- `tool`: streamed tool events from pi-agent-core + +## Chat surface handling +- `createAgentEventHandler` in `src/gateway/server-chat.ts`: + - buffers assistant deltas + - emits chat `delta` messages + - emits chat `final` when **lifecycle end/error** arrives + +## Timeouts +- `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides. +- Agent runtime: `agent.timeoutSeconds` default 600s; enforced in `runEmbeddedPiAgent` abort timer. + +## Where things can end early +- Agent timeout (abort) +- AbortSignal (cancel) +- Gateway disconnect or RPC timeout +- `agent.wait` timeout (wait-only, does not stop agent) + +## Files +- `src/gateway/server-methods/agent.ts` +- `src/gateway/server-methods/agent-job.ts` +- `src/commands/agent.ts` +- `src/agents/pi-embedded-runner.ts` +- `src/agents/pi-embedded-subscribe.ts` +- `src/gateway/server-chat.ts` diff --git a/docs/refactor/agent-loop.md b/docs/refactor/agent-loop.md new file mode 100644 index 000000000..6693d6086 --- /dev/null +++ b/docs/refactor/agent-loop.md @@ -0,0 +1,65 @@ +--- +summary: "Refactor plan: unify agent lifecycle events and wait semantics" +read_when: + - Refactoring agent lifecycle events or wait behavior +--- +# Refactor: Agent Loop + +Goal: align Clawdis run lifecycle with pi/mom semantics, remove ambiguity between "job" and "agent_end". + +## Problem +- Two lifecycles today: + - `job` (gateway wrapper) => used by `agent.wait` + chat final + - pi-agent `agent_end` (inner loop) => only logged +- This can finalize early (job done) while late assistant deltas still arrive. +- `afterMs` and timeouts can cause false timeouts in `agent.wait`. + +## Reference (mom) +- Single lifecycle: `agent_start`/`agent_end` from pi-agent-core event stream. +- `waitForIdle()` resolves on `agent_end`. +- No separate job state exposed to clients. + +## Proposed refactor (breaking allowed) +1) Replace public `job` stream with `lifecycle` stream + - `stream: "lifecycle"` + - `data: { phase: "start" | "end" | "error", startedAt, endedAt, error? }` +2) `agent.wait` waits on lifecycle end/error only + - remove `afterMs` + - return `{ runId, status, startedAt, endedAt, error? }` +3) Chat final emitted on lifecycle end only + - deltas still from `assistant` stream +4) Centralize run registry + - one map keyed by runId: sessionKey, startedAt, lastSeq, bufferedText + - clear on lifecycle end + +## Implementation outline +- `src/agents/pi-embedded-subscribe.ts` + - emit lifecycle start/end events (translate pi `agent_start`/`agent_end`) +- `src/infra/agent-events.ts` + - add `"lifecycle"` to stream type +- `src/gateway/protocol/schema.ts` + - update AgentEvent schema; update AgentWait params (remove afterMs, add status) +- `src/gateway/server-methods/agent-job.ts` + - rename to `agent-wait.ts` or similar; wait on lifecycle end/error +- `src/gateway/server-chat.ts` + - finalize on lifecycle end (not job) +- `src/commands/agent.ts` + - stop emitting `job` externally (keep internal log if needed) + +## Migration notes (breaking) +- Update all callers of `agent.wait` to new response shape. +- Update tests that expect `timeout` based on job events. +- If any UI relies on job state, map lifecycle instead. + +## Risks +- If lifecycle events are dropped, wait/chat could hang; add timeout in `agent.wait` to fail fast. +- Late deltas after lifecycle end should be ignored; keep seq tracking + drop. + +## Acceptance +- One lifecycle visible to clients. +- `agent.wait` resolves when agent loop ends, not wrapper completion. +- Chat final never emits before last assistant delta. + +## Rollout (if we wanted safety) +- Gate with config flag `agent.lifecycleMode = "legacy"|"refactor"`. +- Remove legacy after one release. diff --git a/src/agents/clawdbot-tools.sessions.test.ts b/src/agents/clawdbot-tools.sessions.test.ts index 1d68aaacf..5a04986a9 100644 --- a/src/agents/clawdbot-tools.sessions.test.ts +++ b/src/agents/clawdbot-tools.sessions.test.ts @@ -261,12 +261,6 @@ describe("sessions tools", () => { ).toBe(true); expect(waitCalls).toHaveLength(8); expect(historyOnlyCalls).toHaveLength(8); - expect( - waitCalls.some( - (call) => - typeof (call.params as { afterMs?: number })?.afterMs === "number", - ), - ).toBe(true); expect(sendCallCount).toBe(0); }); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 2d32f4fe3..e5c39c0fd 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -616,6 +616,18 @@ export function subscribeEmbeddedPiSession(params: { if (evt.type === "agent_start") { log.debug(`embedded run agent start: runId=${params.runId}`); + emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { + phase: "start", + startedAt: Date.now(), + }, + }); + params.onAgentEvent?.({ + stream: "lifecycle", + data: { phase: "start" }, + }); } if (evt.type === "auto_compaction_start") { @@ -638,6 +650,18 @@ export function subscribeEmbeddedPiSession(params: { if (evt.type === "agent_end") { log.debug(`embedded run agent end: runId=${params.runId}`); + emitAgentEvent({ + runId: params.runId, + stream: "lifecycle", + data: { + phase: "end", + endedAt: Date.now(), + }, + }); + params.onAgentEvent?.({ + stream: "lifecycle", + data: { phase: "end" }, + }); if (pendingCompactionRetry > 0) { resolveCompactionRetry(); } else { diff --git a/src/agents/tools/sessions-send-tool.ts b/src/agents/tools/sessions-send-tool.ts index 81ca0daa7..3edda725c 100644 --- a/src/agents/tools/sessions-send-tool.ts +++ b/src/agents/tools/sessions-send-tool.ts @@ -151,16 +151,11 @@ export function createSessionsSendTool(opts?: { typeof response?.runId === "string" && response.runId ? response.runId : stepIdem; - const stepAcceptedAt = - typeof response?.acceptedAt === "number" - ? response.acceptedAt - : undefined; const stepWaitMs = Math.min(step.timeoutMs, 60_000); const wait = (await callGateway({ method: "agent.wait", params: { runId: stepRunId, - afterMs: stepAcceptedAt, timeoutMs: stepWaitMs, }, timeoutMs: stepWaitMs + 2000, @@ -171,7 +166,7 @@ export function createSessionsSendTool(opts?: { const runAgentToAgentFlow = async ( roundOneReply?: string, - runInfo?: { runId: string; acceptedAt?: number }, + runInfo?: { runId: string }, ) => { try { let primaryReply = roundOneReply; @@ -182,7 +177,6 @@ export function createSessionsSendTool(opts?: { method: "agent.wait", params: { runId: runInfo.runId, - afterMs: runInfo.acceptedAt, timeoutMs: waitMs, }, timeoutMs: waitMs + 2000, @@ -277,14 +271,10 @@ export function createSessionsSendTool(opts?: { params: sendParams, timeoutMs: 10_000, })) as { runId?: string; acceptedAt?: number }; - const acceptedAt = - typeof response?.acceptedAt === "number" - ? response.acceptedAt - : undefined; if (typeof response?.runId === "string" && response.runId) { runId = response.runId; } - void runAgentToAgentFlow(undefined, { runId, acceptedAt }); + void runAgentToAgentFlow(undefined, { runId }); return jsonResult({ runId, status: "accepted", @@ -306,7 +296,6 @@ export function createSessionsSendTool(opts?: { } } - let acceptedAt: number | undefined; try { const response = (await callGateway({ method: "agent", @@ -316,9 +305,6 @@ export function createSessionsSendTool(opts?: { if (typeof response?.runId === "string" && response.runId) { runId = response.runId; } - if (typeof response?.acceptedAt === "number") { - acceptedAt = response.acceptedAt; - } } catch (err) { const messageText = err instanceof Error @@ -341,7 +327,6 @@ export function createSessionsSendTool(opts?: { method: "agent.wait", params: { runId, - afterMs: acceptedAt, timeoutMs, }, timeoutMs: timeoutMs + 2000, diff --git a/src/canvas-host/a2ui/.bundle.hash b/src/canvas-host/a2ui/.bundle.hash index c99d5f34f..ad45062a8 100644 --- a/src/canvas-host/a2ui/.bundle.hash +++ b/src/canvas-host/a2ui/.bundle.hash @@ -1,3 +1 @@ -68f18193053997f3dee16de6b0be0bcd97dc70ff8200c77f687479e8b19b78e1 -||||||| Stash base -7daf1cbf58ef395b74c2690c439ac7b3cb536e8eb124baf72ad41da4f542204d +971128267a38be786e60f2e900770da48ea305c8cebe6c53ab56e6ea86d924df diff --git a/src/commands/agent.ts b/src/commands/agent.ts index eba607696..e6306ae66 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -352,17 +352,7 @@ export async function agentCommand( const sessionFile = resolveSessionTranscriptPath(sessionId); const startedAt = Date.now(); - emitAgentEvent({ - runId, - stream: "job", - data: { - state: "started", - startedAt, - to: opts.to ?? null, - sessionId, - isNewSession, - }, - }); + let lifecycleEnded = false; let result: Awaited>; let fallbackProvider = provider; @@ -399,6 +389,13 @@ export async function agentCommand( abortSignal: opts.abortSignal, extraSystemPrompt: opts.extraSystemPrompt, onAgentEvent: (evt) => { + if ( + evt.stream === "lifecycle" && + typeof evt.data?.phase === "string" && + (evt.data.phase === "end" || evt.data.phase === "error") + ) { + lifecycleEnded = true; + } emitAgentEvent({ runId, stream: evt.stream, @@ -410,33 +407,31 @@ export async function agentCommand( result = fallbackResult.result; fallbackProvider = fallbackResult.provider; fallbackModel = fallbackResult.model; - emitAgentEvent({ - runId, - stream: "job", - data: { - state: "done", - startedAt, - endedAt: Date.now(), - to: opts.to ?? null, - sessionId, - durationMs: Date.now() - startedAt, - aborted: result.meta.aborted ?? false, - }, - }); + if (!lifecycleEnded) { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "end", + startedAt, + endedAt: Date.now(), + aborted: result.meta.aborted ?? false, + }, + }); + } } catch (err) { - emitAgentEvent({ - runId, - stream: "job", - data: { - state: "error", - startedAt, - endedAt: Date.now(), - to: opts.to ?? null, - sessionId, - durationMs: Date.now() - startedAt, - error: String(err), - }, - }); + if (!lifecycleEnded) { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { + phase: "error", + startedAt, + endedAt: Date.now(), + error: String(err), + }, + }); + } throw err; } diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index 33e159f81..d5dfed536 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -218,7 +218,6 @@ export const AgentParamsSchema = Type.Object( export const AgentWaitParamsSchema = Type.Object( { runId: NonEmptyString, - afterMs: Type.Optional(Type.Integer({ minimum: 0 })), timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })), }, { additionalProperties: false }, diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index e204ebab9..124e2ed24 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -208,9 +208,9 @@ export function createAgentEventHandler({ agentRunSeq.set(evt.runId, evt.seq); broadcast("agent", agentPayload); - const jobState = - evt.stream === "job" && typeof evt.data?.state === "string" - ? evt.data.state + const lifecyclePhase = + evt.stream === "lifecycle" && typeof evt.data?.phase === "string" + ? evt.data.phase : null; if (sessionKey) { @@ -218,7 +218,7 @@ export function createAgentEventHandler({ if (evt.stream === "assistant" && typeof evt.data?.text === "string") { const clientRunId = chatLink?.clientRunId ?? evt.runId; emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text); - } else if (jobState === "done" || jobState === "error") { + } else if (lifecyclePhase === "end" || lifecyclePhase === "error") { if (chatLink) { const finished = chatRunState.registry.shift(evt.runId); if (!finished) { @@ -229,7 +229,7 @@ export function createAgentEventHandler({ finished.sessionKey, finished.clientRunId, evt.seq, - jobState, + lifecyclePhase === "error" ? "error" : "done", evt.data?.error, ); } else { @@ -237,14 +237,14 @@ export function createAgentEventHandler({ sessionKey, evt.runId, evt.seq, - jobState, + lifecyclePhase === "error" ? "error" : "done", evt.data?.error, ); } } } - if (jobState === "done" || jobState === "error") { + if (lifecyclePhase === "end" || lifecyclePhase === "error") { clearAgentRunContext(evt.runId); } }; diff --git a/src/gateway/server-methods/agent-job.ts b/src/gateway/server-methods/agent-job.ts index 164f47bc1..850d8d18c 100644 --- a/src/gateway/server-methods/agent-job.ts +++ b/src/gateway/server-methods/agent-job.ts @@ -1,50 +1,48 @@ import { onAgentEvent } from "../../infra/agent-events.js"; -const AGENT_JOB_CACHE_TTL_MS = 10 * 60_000; -const agentJobCache = new Map(); +const AGENT_RUN_CACHE_TTL_MS = 10 * 60_000; +const agentRunCache = new Map(); const agentRunStarts = new Map(); -let agentJobListenerStarted = false; +let agentRunListenerStarted = false; -type AgentJobSnapshot = { +type AgentRunSnapshot = { runId: string; - state: "done" | "error"; + status: "ok" | "error"; startedAt?: number; endedAt?: number; error?: string; ts: number; }; -function pruneAgentJobCache(now = Date.now()) { - for (const [runId, entry] of agentJobCache) { - if (now - entry.ts > AGENT_JOB_CACHE_TTL_MS) { - agentJobCache.delete(runId); +function pruneAgentRunCache(now = Date.now()) { + for (const [runId, entry] of agentRunCache) { + if (now - entry.ts > AGENT_RUN_CACHE_TTL_MS) { + agentRunCache.delete(runId); } } } -function recordAgentJobSnapshot(entry: AgentJobSnapshot) { - pruneAgentJobCache(entry.ts); - agentJobCache.set(entry.runId, entry); +function recordAgentRunSnapshot(entry: AgentRunSnapshot) { + pruneAgentRunCache(entry.ts); + agentRunCache.set(entry.runId, entry); } -function ensureAgentJobListener() { - if (agentJobListenerStarted) return; - agentJobListenerStarted = true; +function ensureAgentRunListener() { + if (agentRunListenerStarted) return; + agentRunListenerStarted = true; onAgentEvent((evt) => { if (!evt) return; - if (evt.stream !== "job") return; - const state = evt.data?.state; - if (state === "started") { + if (evt.stream !== "lifecycle") return; + const phase = evt.data?.phase; + if (phase === "start") { const startedAt = typeof evt.data?.startedAt === "number" ? (evt.data.startedAt as number) : undefined; - if (startedAt !== undefined) { - agentRunStarts.set(evt.runId, startedAt); - } + agentRunStarts.set(evt.runId, startedAt ?? Date.now()); return; } - if (state !== "done" && state !== "error") return; + if (phase !== "end" && phase !== "error") return; const startedAt = typeof evt.data?.startedAt === "number" ? (evt.data.startedAt as number) @@ -58,9 +56,9 @@ function ensureAgentJobListener() { ? (evt.data.error as string) : undefined; agentRunStarts.delete(evt.runId); - recordAgentJobSnapshot({ + recordAgentRunSnapshot({ runId: evt.runId, - state: state === "error" ? "error" : "done", + status: phase === "error" ? "error" : "ok", startedAt, endedAt, error, @@ -69,34 +67,24 @@ function ensureAgentJobListener() { }); } -function matchesAfterMs(entry: AgentJobSnapshot, afterMs?: number) { - if (afterMs === undefined) return true; - if (typeof entry.startedAt === "number") return entry.startedAt >= afterMs; - if (typeof entry.endedAt === "number") return entry.endedAt >= afterMs; - return false; -} - -function getCachedAgentJob(runId: string, afterMs?: number) { - pruneAgentJobCache(); - const cached = agentJobCache.get(runId); - if (!cached) return undefined; - return matchesAfterMs(cached, afterMs) ? cached : undefined; +function getCachedAgentRun(runId: string) { + pruneAgentRunCache(); + return agentRunCache.get(runId); } export async function waitForAgentJob(params: { runId: string; - afterMs?: number; timeoutMs: number; -}): Promise { - const { runId, afterMs, timeoutMs } = params; - ensureAgentJobListener(); - const cached = getCachedAgentJob(runId, afterMs); +}): Promise { + const { runId, timeoutMs } = params; + ensureAgentRunListener(); + const cached = getCachedAgentRun(runId); if (cached) return cached; if (timeoutMs <= 0) return null; return await new Promise((resolve) => { let settled = false; - const finish = (entry: AgentJobSnapshot | null) => { + const finish = (entry: AgentRunSnapshot | null) => { if (settled) return; settled = true; clearTimeout(timer); @@ -104,10 +92,15 @@ export async function waitForAgentJob(params: { resolve(entry); }; const unsubscribe = onAgentEvent((evt) => { - if (!evt || evt.stream !== "job") return; + if (!evt || evt.stream !== "lifecycle") return; if (evt.runId !== runId) return; - const state = evt.data?.state; - if (state !== "done" && state !== "error") return; + const phase = evt.data?.phase; + if (phase !== "end" && phase !== "error") return; + const cached = getCachedAgentRun(runId); + if (cached) { + finish(cached); + return; + } const startedAt = typeof evt.data?.startedAt === "number" ? (evt.data.startedAt as number) @@ -120,20 +113,19 @@ export async function waitForAgentJob(params: { typeof evt.data?.error === "string" ? (evt.data.error as string) : undefined; - const snapshot: AgentJobSnapshot = { + const snapshot: AgentRunSnapshot = { runId: evt.runId, - state: state === "error" ? "error" : "done", + status: phase === "error" ? "error" : "ok", startedAt, endedAt, error, ts: Date.now(), }; - recordAgentJobSnapshot(snapshot); - if (!matchesAfterMs(snapshot, afterMs)) return; + recordAgentRunSnapshot(snapshot); finish(snapshot); }); const timer = setTimeout(() => finish(null), Math.max(1, timeoutMs)); }); } -ensureAgentJobListener(); +ensureAgentRunListener(); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 16a348617..6bc1d6df4 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -288,10 +288,6 @@ export const agentHandlers: GatewayRequestHandlers = { } const p = params as AgentWaitParams; const runId = p.runId.trim(); - const afterMs = - typeof p.afterMs === "number" && Number.isFinite(p.afterMs) - ? Math.max(0, Math.floor(p.afterMs)) - : undefined; const timeoutMs = typeof p.timeoutMs === "number" && Number.isFinite(p.timeoutMs) ? Math.max(0, Math.floor(p.timeoutMs)) @@ -299,7 +295,6 @@ export const agentHandlers: GatewayRequestHandlers = { const snapshot = await waitForAgentJob({ runId, - afterMs, timeoutMs, }); if (!snapshot) { @@ -311,7 +306,7 @@ export const agentHandlers: GatewayRequestHandlers = { } respond(true, { runId, - status: snapshot.state === "done" ? "ok" : "error", + status: snapshot.status, startedAt: snapshot.startedAt, endedAt: snapshot.endedAt, error: snapshot.error, diff --git a/src/gateway/server.agent.test.ts b/src/gateway/server.agent.test.ts index cc1bada2a..9edea8f38 100644 --- a/src/gateway/server.agent.test.ts +++ b/src/gateway/server.agent.test.ts @@ -463,8 +463,8 @@ describe("gateway server agent", () => { }); emitAgentEvent({ runId: "run-auto-1", - stream: "job", - data: { state: "done" }, + stream: "lifecycle", + data: { phase: "end" }, }); const evt = await finalChatP; @@ -518,21 +518,20 @@ describe("gateway server agent", () => { await server.close(); }); - test("agent.wait resolves after job completes", async () => { + test("agent.wait resolves after lifecycle end", async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); const waitP = rpcReq(ws, "agent.wait", { runId: "run-wait-1", - afterMs: 100, timeoutMs: 1000, }); setTimeout(() => { emitAgentEvent({ runId: "run-wait-1", - stream: "job", - data: { state: "done", startedAt: 200, endedAt: 210 }, + stream: "lifecycle", + data: { phase: "end", startedAt: 200, endedAt: 210 }, }); }, 10); @@ -545,14 +544,14 @@ describe("gateway server agent", () => { await server.close(); }); - test("agent.wait resolves when job completed before wait call", async () => { + test("agent.wait resolves when lifecycle ended before wait call", async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); emitAgentEvent({ runId: "run-wait-early", - stream: "job", - data: { state: "done", startedAt: 50, endedAt: 55 }, + stream: "lifecycle", + data: { phase: "end", startedAt: 50, endedAt: 55 }, }); const res = await rpcReq(ws, "agent.wait", { @@ -567,41 +566,7 @@ describe("gateway server agent", () => { await server.close(); }); - test("agent.wait ignores jobs before afterMs", async () => { - const { server, ws } = await startServerWithClient(); - await connectOk(ws); - - const waitP = rpcReq(ws, "agent.wait", { - runId: "run-wait-2", - afterMs: 500, - timeoutMs: 1000, - }); - - setTimeout(() => { - emitAgentEvent({ - runId: "run-wait-2", - stream: "job", - data: { state: "done", startedAt: 200, endedAt: 220 }, - }); - }, 10); - setTimeout(() => { - emitAgentEvent({ - runId: "run-wait-2", - stream: "job", - data: { state: "done", startedAt: 700, endedAt: 710 }, - }); - }, 20); - - const res = await waitP; - expect(res.ok).toBe(true); - expect(res.payload.status).toBe("ok"); - expect(res.payload.startedAt).toBe(700); - - ws.close(); - await server.close(); - }); - - test("agent.wait times out when no job completes", async () => { + test("agent.wait times out when no lifecycle ends", async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); @@ -615,4 +580,63 @@ describe("gateway server agent", () => { ws.close(); await server.close(); }); + + test("agent.wait returns error on lifecycle error", async () => { + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const waitP = rpcReq(ws, "agent.wait", { + runId: "run-wait-err", + timeoutMs: 1000, + }); + + setTimeout(() => { + emitAgentEvent({ + runId: "run-wait-err", + stream: "lifecycle", + data: { phase: "error", error: "boom" }, + }); + }, 10); + + const res = await waitP; + expect(res.ok).toBe(true); + expect(res.payload.status).toBe("error"); + expect(res.payload.error).toBe("boom"); + + ws.close(); + await server.close(); + }); + + test("agent.wait uses lifecycle start timestamp when end omits it", async () => { + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const waitP = rpcReq(ws, "agent.wait", { + runId: "run-wait-start", + timeoutMs: 1000, + }); + + emitAgentEvent({ + runId: "run-wait-start", + stream: "lifecycle", + data: { phase: "start", startedAt: 123 }, + }); + + setTimeout(() => { + emitAgentEvent({ + runId: "run-wait-start", + stream: "lifecycle", + data: { phase: "end", endedAt: 456 }, + }); + }, 10); + + const res = await waitP; + expect(res.ok).toBe(true); + expect(res.payload.status).toBe("ok"); + expect(res.payload.startedAt).toBe(123); + expect(res.payload.endedAt).toBe(456); + + ws.close(); + await server.close(); + }); }); diff --git a/src/gateway/server.chat.test.ts b/src/gateway/server.chat.test.ts index e8084d121..c1907c10d 100644 --- a/src/gateway/server.chat.test.ts +++ b/src/gateway/server.chat.test.ts @@ -831,8 +831,8 @@ describe("gateway server chat", () => { emitAgentEvent({ runId: "sess-main", - stream: "job", - data: { state: "done" }, + stream: "lifecycle", + data: { phase: "end" }, }); const final1 = await final1P; @@ -853,8 +853,8 @@ describe("gateway server chat", () => { emitAgentEvent({ runId: "sess-main", - stream: "job", - data: { state: "done" }, + stream: "lifecycle", + data: { phase: "end" }, }); const final2 = await final2P; diff --git a/src/gateway/server.health.test.ts b/src/gateway/server.health.test.ts index a40b3f04f..9c9b869bd 100644 --- a/src/gateway/server.health.test.ts +++ b/src/gateway/server.health.test.ts @@ -173,9 +173,9 @@ describe("gateway server health/presence", () => { o.type === "event" && o.event === "agent" && o.payload?.runId === runId && - o.payload?.stream === "job", + o.payload?.stream === "lifecycle", ); - emitAgentEvent({ runId, stream: "job", data: { msg: "hi" } }); + emitAgentEvent({ runId, stream: "lifecycle", data: { msg: "hi" } }); const evt = await evtPromise; expect(evt.payload.runId).toBe(runId); expect(typeof evt.seq).toBe("number"); diff --git a/src/gateway/server.node-bridge.test.ts b/src/gateway/server.node-bridge.test.ts index 830c29b58..fc1e29fc8 100644 --- a/src/gateway/server.node-bridge.test.ts +++ b/src/gateway/server.node-bridge.test.ts @@ -699,8 +699,8 @@ describe("gateway server node/bridge", () => { runId: "sess-main", seq: 2, ts: Date.now(), - stream: "job", - data: { state: "done" }, + stream: "lifecycle", + data: { phase: "end" }, }); await new Promise((r) => setTimeout(r, 25)); @@ -841,8 +841,8 @@ describe("gateway server node/bridge", () => { runId: "sess-main", seq: 2, ts: Date.now(), - stream: "job", - data: { state: "done" }, + stream: "lifecycle", + data: { phase: "end" }, }); const evt = await finalChatP; diff --git a/src/gateway/server.sessions-send.test.ts b/src/gateway/server.sessions-send.test.ts index 4cdc1d504..73ea13d8b 100644 --- a/src/gateway/server.sessions-send.test.ts +++ b/src/gateway/server.sessions-send.test.ts @@ -14,7 +14,7 @@ import { installGatewayTestHooks(); describe("sessions_send gateway loopback", () => { - it("returns reply when job finishes before agent.wait", async () => { + it("returns reply when lifecycle ends before agent.wait", async () => { const port = await getFreePort(); const prevPort = process.env.CLAWDBOT_GATEWAY_PORT; process.env.CLAWDBOT_GATEWAY_PORT = String(port); @@ -35,8 +35,8 @@ describe("sessions_send gateway loopback", () => { const startedAt = Date.now(); emitAgentEvent({ runId, - stream: "job", - data: { state: "started", startedAt, sessionId }, + stream: "lifecycle", + data: { phase: "start", startedAt }, }); let text = "pong"; @@ -60,12 +60,11 @@ describe("sessions_send gateway loopback", () => { emitAgentEvent({ runId, - stream: "job", + stream: "lifecycle", data: { - state: "done", + phase: "end", startedAt, endedAt: Date.now(), - sessionId, }, }); }); diff --git a/src/gateway/ws-log.ts b/src/gateway/ws-log.ts index 57216604c..03f0c4a82 100644 --- a/src/gateway/ws-log.ts +++ b/src/gateway/ws-log.ts @@ -121,13 +121,9 @@ export function summarizeAgentEventForWsLog( return extra; } - if (stream === "job") { - const state = typeof data.state === "string" ? data.state : undefined; - if (state) extra.state = state; - if (data.to === null) extra.to = null; - else if (typeof data.to === "string") extra.to = data.to; - if (typeof data.durationMs === "number") - extra.ms = Math.round(data.durationMs); + if (stream === "lifecycle") { + const phase = typeof data.phase === "string" ? data.phase : undefined; + if (phase) extra.phase = phase; if (typeof data.aborted === "boolean") extra.aborted = data.aborted; const error = typeof data.error === "string" ? data.error : undefined; if (error?.trim()) extra.error = compactPreview(error, 120); diff --git a/src/infra/agent-events.test.ts b/src/infra/agent-events.test.ts index 96b07a503..c4b666236 100644 --- a/src/infra/agent-events.test.ts +++ b/src/infra/agent-events.test.ts @@ -25,10 +25,10 @@ describe("agent-events sequencing", () => { list.push(evt.seq); }); - emitAgentEvent({ runId: "run-1", stream: "job", data: {} }); - emitAgentEvent({ runId: "run-1", stream: "job", data: {} }); - emitAgentEvent({ runId: "run-2", stream: "job", data: {} }); - emitAgentEvent({ runId: "run-1", stream: "job", data: {} }); + emitAgentEvent({ runId: "run-1", stream: "lifecycle", data: {} }); + emitAgentEvent({ runId: "run-1", stream: "lifecycle", data: {} }); + emitAgentEvent({ runId: "run-2", stream: "lifecycle", data: {} }); + emitAgentEvent({ runId: "run-1", stream: "lifecycle", data: {} }); stop(); diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 1022d024d..e61f841cf 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -1,5 +1,5 @@ export type AgentEventStream = - | "job" + | "lifecycle" | "tool" | "assistant" | "error" diff --git a/src/tui/tui.ts b/src/tui/tui.ts index 5ff7bafd0..9259f2ad8 100644 --- a/src/tui/tui.ts +++ b/src/tui/tui.ts @@ -375,11 +375,11 @@ export async function runTui(opts: TuiOptions) { tui.requestRender(); return; } - if (evt.stream === "job") { - const state = typeof evt.data?.state === "string" ? evt.data.state : ""; - if (state === "started") setStatus("running"); - if (state === "done") setStatus("idle"); - if (state === "error") setStatus("error"); + if (evt.stream === "lifecycle") { + const phase = typeof evt.data?.phase === "string" ? evt.data.phase : ""; + if (phase === "start") setStatus("running"); + if (phase === "end") setStatus("idle"); + if (phase === "error") setStatus("error"); tui.requestRender(); } };