refactor: align agent lifecycle
This commit is contained in:
@@ -218,7 +218,6 @@ export const AgentParamsSchema = Type.Object(
|
||||
export const AgentWaitParamsSchema = Type.Object(
|
||||
{
|
||||
runId: NonEmptyString,
|
||||
afterMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
|
||||
@@ -208,9 +208,9 @@ export function createAgentEventHandler({
|
||||
agentRunSeq.set(evt.runId, evt.seq);
|
||||
broadcast("agent", agentPayload);
|
||||
|
||||
const jobState =
|
||||
evt.stream === "job" && typeof evt.data?.state === "string"
|
||||
? evt.data.state
|
||||
const lifecyclePhase =
|
||||
evt.stream === "lifecycle" && typeof evt.data?.phase === "string"
|
||||
? evt.data.phase
|
||||
: null;
|
||||
|
||||
if (sessionKey) {
|
||||
@@ -218,7 +218,7 @@ export function createAgentEventHandler({
|
||||
if (evt.stream === "assistant" && typeof evt.data?.text === "string") {
|
||||
const clientRunId = chatLink?.clientRunId ?? evt.runId;
|
||||
emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text);
|
||||
} else if (jobState === "done" || jobState === "error") {
|
||||
} else if (lifecyclePhase === "end" || lifecyclePhase === "error") {
|
||||
if (chatLink) {
|
||||
const finished = chatRunState.registry.shift(evt.runId);
|
||||
if (!finished) {
|
||||
@@ -229,7 +229,7 @@ export function createAgentEventHandler({
|
||||
finished.sessionKey,
|
||||
finished.clientRunId,
|
||||
evt.seq,
|
||||
jobState,
|
||||
lifecyclePhase === "error" ? "error" : "done",
|
||||
evt.data?.error,
|
||||
);
|
||||
} else {
|
||||
@@ -237,14 +237,14 @@ export function createAgentEventHandler({
|
||||
sessionKey,
|
||||
evt.runId,
|
||||
evt.seq,
|
||||
jobState,
|
||||
lifecyclePhase === "error" ? "error" : "done",
|
||||
evt.data?.error,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (jobState === "done" || jobState === "error") {
|
||||
if (lifecyclePhase === "end" || lifecyclePhase === "error") {
|
||||
clearAgentRunContext(evt.runId);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,50 +1,48 @@
|
||||
import { onAgentEvent } from "../../infra/agent-events.js";
|
||||
|
||||
const AGENT_JOB_CACHE_TTL_MS = 10 * 60_000;
|
||||
const agentJobCache = new Map<string, AgentJobSnapshot>();
|
||||
const AGENT_RUN_CACHE_TTL_MS = 10 * 60_000;
|
||||
const agentRunCache = new Map<string, AgentRunSnapshot>();
|
||||
const agentRunStarts = new Map<string, number>();
|
||||
let agentJobListenerStarted = false;
|
||||
let agentRunListenerStarted = false;
|
||||
|
||||
type AgentJobSnapshot = {
|
||||
type AgentRunSnapshot = {
|
||||
runId: string;
|
||||
state: "done" | "error";
|
||||
status: "ok" | "error";
|
||||
startedAt?: number;
|
||||
endedAt?: number;
|
||||
error?: string;
|
||||
ts: number;
|
||||
};
|
||||
|
||||
function pruneAgentJobCache(now = Date.now()) {
|
||||
for (const [runId, entry] of agentJobCache) {
|
||||
if (now - entry.ts > AGENT_JOB_CACHE_TTL_MS) {
|
||||
agentJobCache.delete(runId);
|
||||
function pruneAgentRunCache(now = Date.now()) {
|
||||
for (const [runId, entry] of agentRunCache) {
|
||||
if (now - entry.ts > AGENT_RUN_CACHE_TTL_MS) {
|
||||
agentRunCache.delete(runId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function recordAgentJobSnapshot(entry: AgentJobSnapshot) {
|
||||
pruneAgentJobCache(entry.ts);
|
||||
agentJobCache.set(entry.runId, entry);
|
||||
function recordAgentRunSnapshot(entry: AgentRunSnapshot) {
|
||||
pruneAgentRunCache(entry.ts);
|
||||
agentRunCache.set(entry.runId, entry);
|
||||
}
|
||||
|
||||
function ensureAgentJobListener() {
|
||||
if (agentJobListenerStarted) return;
|
||||
agentJobListenerStarted = true;
|
||||
function ensureAgentRunListener() {
|
||||
if (agentRunListenerStarted) return;
|
||||
agentRunListenerStarted = true;
|
||||
onAgentEvent((evt) => {
|
||||
if (!evt) return;
|
||||
if (evt.stream !== "job") return;
|
||||
const state = evt.data?.state;
|
||||
if (state === "started") {
|
||||
if (evt.stream !== "lifecycle") return;
|
||||
const phase = evt.data?.phase;
|
||||
if (phase === "start") {
|
||||
const startedAt =
|
||||
typeof evt.data?.startedAt === "number"
|
||||
? (evt.data.startedAt as number)
|
||||
: undefined;
|
||||
if (startedAt !== undefined) {
|
||||
agentRunStarts.set(evt.runId, startedAt);
|
||||
}
|
||||
agentRunStarts.set(evt.runId, startedAt ?? Date.now());
|
||||
return;
|
||||
}
|
||||
if (state !== "done" && state !== "error") return;
|
||||
if (phase !== "end" && phase !== "error") return;
|
||||
const startedAt =
|
||||
typeof evt.data?.startedAt === "number"
|
||||
? (evt.data.startedAt as number)
|
||||
@@ -58,9 +56,9 @@ function ensureAgentJobListener() {
|
||||
? (evt.data.error as string)
|
||||
: undefined;
|
||||
agentRunStarts.delete(evt.runId);
|
||||
recordAgentJobSnapshot({
|
||||
recordAgentRunSnapshot({
|
||||
runId: evt.runId,
|
||||
state: state === "error" ? "error" : "done",
|
||||
status: phase === "error" ? "error" : "ok",
|
||||
startedAt,
|
||||
endedAt,
|
||||
error,
|
||||
@@ -69,34 +67,24 @@ function ensureAgentJobListener() {
|
||||
});
|
||||
}
|
||||
|
||||
function matchesAfterMs(entry: AgentJobSnapshot, afterMs?: number) {
|
||||
if (afterMs === undefined) return true;
|
||||
if (typeof entry.startedAt === "number") return entry.startedAt >= afterMs;
|
||||
if (typeof entry.endedAt === "number") return entry.endedAt >= afterMs;
|
||||
return false;
|
||||
}
|
||||
|
||||
function getCachedAgentJob(runId: string, afterMs?: number) {
|
||||
pruneAgentJobCache();
|
||||
const cached = agentJobCache.get(runId);
|
||||
if (!cached) return undefined;
|
||||
return matchesAfterMs(cached, afterMs) ? cached : undefined;
|
||||
function getCachedAgentRun(runId: string) {
|
||||
pruneAgentRunCache();
|
||||
return agentRunCache.get(runId);
|
||||
}
|
||||
|
||||
export async function waitForAgentJob(params: {
|
||||
runId: string;
|
||||
afterMs?: number;
|
||||
timeoutMs: number;
|
||||
}): Promise<AgentJobSnapshot | null> {
|
||||
const { runId, afterMs, timeoutMs } = params;
|
||||
ensureAgentJobListener();
|
||||
const cached = getCachedAgentJob(runId, afterMs);
|
||||
}): Promise<AgentRunSnapshot | null> {
|
||||
const { runId, timeoutMs } = params;
|
||||
ensureAgentRunListener();
|
||||
const cached = getCachedAgentRun(runId);
|
||||
if (cached) return cached;
|
||||
if (timeoutMs <= 0) return null;
|
||||
|
||||
return await new Promise((resolve) => {
|
||||
let settled = false;
|
||||
const finish = (entry: AgentJobSnapshot | null) => {
|
||||
const finish = (entry: AgentRunSnapshot | null) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
clearTimeout(timer);
|
||||
@@ -104,10 +92,15 @@ export async function waitForAgentJob(params: {
|
||||
resolve(entry);
|
||||
};
|
||||
const unsubscribe = onAgentEvent((evt) => {
|
||||
if (!evt || evt.stream !== "job") return;
|
||||
if (!evt || evt.stream !== "lifecycle") return;
|
||||
if (evt.runId !== runId) return;
|
||||
const state = evt.data?.state;
|
||||
if (state !== "done" && state !== "error") return;
|
||||
const phase = evt.data?.phase;
|
||||
if (phase !== "end" && phase !== "error") return;
|
||||
const cached = getCachedAgentRun(runId);
|
||||
if (cached) {
|
||||
finish(cached);
|
||||
return;
|
||||
}
|
||||
const startedAt =
|
||||
typeof evt.data?.startedAt === "number"
|
||||
? (evt.data.startedAt as number)
|
||||
@@ -120,20 +113,19 @@ export async function waitForAgentJob(params: {
|
||||
typeof evt.data?.error === "string"
|
||||
? (evt.data.error as string)
|
||||
: undefined;
|
||||
const snapshot: AgentJobSnapshot = {
|
||||
const snapshot: AgentRunSnapshot = {
|
||||
runId: evt.runId,
|
||||
state: state === "error" ? "error" : "done",
|
||||
status: phase === "error" ? "error" : "ok",
|
||||
startedAt,
|
||||
endedAt,
|
||||
error,
|
||||
ts: Date.now(),
|
||||
};
|
||||
recordAgentJobSnapshot(snapshot);
|
||||
if (!matchesAfterMs(snapshot, afterMs)) return;
|
||||
recordAgentRunSnapshot(snapshot);
|
||||
finish(snapshot);
|
||||
});
|
||||
const timer = setTimeout(() => finish(null), Math.max(1, timeoutMs));
|
||||
});
|
||||
}
|
||||
|
||||
ensureAgentJobListener();
|
||||
ensureAgentRunListener();
|
||||
|
||||
@@ -288,10 +288,6 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
}
|
||||
const p = params as AgentWaitParams;
|
||||
const runId = p.runId.trim();
|
||||
const afterMs =
|
||||
typeof p.afterMs === "number" && Number.isFinite(p.afterMs)
|
||||
? Math.max(0, Math.floor(p.afterMs))
|
||||
: undefined;
|
||||
const timeoutMs =
|
||||
typeof p.timeoutMs === "number" && Number.isFinite(p.timeoutMs)
|
||||
? Math.max(0, Math.floor(p.timeoutMs))
|
||||
@@ -299,7 +295,6 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
|
||||
const snapshot = await waitForAgentJob({
|
||||
runId,
|
||||
afterMs,
|
||||
timeoutMs,
|
||||
});
|
||||
if (!snapshot) {
|
||||
@@ -311,7 +306,7 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
}
|
||||
respond(true, {
|
||||
runId,
|
||||
status: snapshot.state === "done" ? "ok" : "error",
|
||||
status: snapshot.status,
|
||||
startedAt: snapshot.startedAt,
|
||||
endedAt: snapshot.endedAt,
|
||||
error: snapshot.error,
|
||||
|
||||
@@ -463,8 +463,8 @@ describe("gateway server agent", () => {
|
||||
});
|
||||
emitAgentEvent({
|
||||
runId: "run-auto-1",
|
||||
stream: "job",
|
||||
data: { state: "done" },
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
|
||||
const evt = await finalChatP;
|
||||
@@ -518,21 +518,20 @@ describe("gateway server agent", () => {
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("agent.wait resolves after job completes", async () => {
|
||||
test("agent.wait resolves after lifecycle end", async () => {
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const waitP = rpcReq(ws, "agent.wait", {
|
||||
runId: "run-wait-1",
|
||||
afterMs: 100,
|
||||
timeoutMs: 1000,
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
emitAgentEvent({
|
||||
runId: "run-wait-1",
|
||||
stream: "job",
|
||||
data: { state: "done", startedAt: 200, endedAt: 210 },
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end", startedAt: 200, endedAt: 210 },
|
||||
});
|
||||
}, 10);
|
||||
|
||||
@@ -545,14 +544,14 @@ describe("gateway server agent", () => {
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("agent.wait resolves when job completed before wait call", async () => {
|
||||
test("agent.wait resolves when lifecycle ended before wait call", async () => {
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "run-wait-early",
|
||||
stream: "job",
|
||||
data: { state: "done", startedAt: 50, endedAt: 55 },
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end", startedAt: 50, endedAt: 55 },
|
||||
});
|
||||
|
||||
const res = await rpcReq(ws, "agent.wait", {
|
||||
@@ -567,41 +566,7 @@ describe("gateway server agent", () => {
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("agent.wait ignores jobs before afterMs", async () => {
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const waitP = rpcReq(ws, "agent.wait", {
|
||||
runId: "run-wait-2",
|
||||
afterMs: 500,
|
||||
timeoutMs: 1000,
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
emitAgentEvent({
|
||||
runId: "run-wait-2",
|
||||
stream: "job",
|
||||
data: { state: "done", startedAt: 200, endedAt: 220 },
|
||||
});
|
||||
}, 10);
|
||||
setTimeout(() => {
|
||||
emitAgentEvent({
|
||||
runId: "run-wait-2",
|
||||
stream: "job",
|
||||
data: { state: "done", startedAt: 700, endedAt: 710 },
|
||||
});
|
||||
}, 20);
|
||||
|
||||
const res = await waitP;
|
||||
expect(res.ok).toBe(true);
|
||||
expect(res.payload.status).toBe("ok");
|
||||
expect(res.payload.startedAt).toBe(700);
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("agent.wait times out when no job completes", async () => {
|
||||
test("agent.wait times out when no lifecycle ends", async () => {
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
@@ -615,4 +580,63 @@ describe("gateway server agent", () => {
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("agent.wait returns error on lifecycle error", async () => {
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const waitP = rpcReq(ws, "agent.wait", {
|
||||
runId: "run-wait-err",
|
||||
timeoutMs: 1000,
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
emitAgentEvent({
|
||||
runId: "run-wait-err",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "error", error: "boom" },
|
||||
});
|
||||
}, 10);
|
||||
|
||||
const res = await waitP;
|
||||
expect(res.ok).toBe(true);
|
||||
expect(res.payload.status).toBe("error");
|
||||
expect(res.payload.error).toBe("boom");
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("agent.wait uses lifecycle start timestamp when end omits it", async () => {
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const waitP = rpcReq(ws, "agent.wait", {
|
||||
runId: "run-wait-start",
|
||||
timeoutMs: 1000,
|
||||
});
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "run-wait-start",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "start", startedAt: 123 },
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
emitAgentEvent({
|
||||
runId: "run-wait-start",
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end", endedAt: 456 },
|
||||
});
|
||||
}, 10);
|
||||
|
||||
const res = await waitP;
|
||||
expect(res.ok).toBe(true);
|
||||
expect(res.payload.status).toBe("ok");
|
||||
expect(res.payload.startedAt).toBe(123);
|
||||
expect(res.payload.endedAt).toBe(456);
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -831,8 +831,8 @@ describe("gateway server chat", () => {
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "sess-main",
|
||||
stream: "job",
|
||||
data: { state: "done" },
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
|
||||
const final1 = await final1P;
|
||||
@@ -853,8 +853,8 @@ describe("gateway server chat", () => {
|
||||
|
||||
emitAgentEvent({
|
||||
runId: "sess-main",
|
||||
stream: "job",
|
||||
data: { state: "done" },
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
|
||||
const final2 = await final2P;
|
||||
|
||||
@@ -173,9 +173,9 @@ describe("gateway server health/presence", () => {
|
||||
o.type === "event" &&
|
||||
o.event === "agent" &&
|
||||
o.payload?.runId === runId &&
|
||||
o.payload?.stream === "job",
|
||||
o.payload?.stream === "lifecycle",
|
||||
);
|
||||
emitAgentEvent({ runId, stream: "job", data: { msg: "hi" } });
|
||||
emitAgentEvent({ runId, stream: "lifecycle", data: { msg: "hi" } });
|
||||
const evt = await evtPromise;
|
||||
expect(evt.payload.runId).toBe(runId);
|
||||
expect(typeof evt.seq).toBe("number");
|
||||
|
||||
@@ -699,8 +699,8 @@ describe("gateway server node/bridge", () => {
|
||||
runId: "sess-main",
|
||||
seq: 2,
|
||||
ts: Date.now(),
|
||||
stream: "job",
|
||||
data: { state: "done" },
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
@@ -841,8 +841,8 @@ describe("gateway server node/bridge", () => {
|
||||
runId: "sess-main",
|
||||
seq: 2,
|
||||
ts: Date.now(),
|
||||
stream: "job",
|
||||
data: { state: "done" },
|
||||
stream: "lifecycle",
|
||||
data: { phase: "end" },
|
||||
});
|
||||
|
||||
const evt = await finalChatP;
|
||||
|
||||
@@ -14,7 +14,7 @@ import {
|
||||
installGatewayTestHooks();
|
||||
|
||||
describe("sessions_send gateway loopback", () => {
|
||||
it("returns reply when job finishes before agent.wait", async () => {
|
||||
it("returns reply when lifecycle ends before agent.wait", async () => {
|
||||
const port = await getFreePort();
|
||||
const prevPort = process.env.CLAWDBOT_GATEWAY_PORT;
|
||||
process.env.CLAWDBOT_GATEWAY_PORT = String(port);
|
||||
@@ -35,8 +35,8 @@ describe("sessions_send gateway loopback", () => {
|
||||
const startedAt = Date.now();
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "job",
|
||||
data: { state: "started", startedAt, sessionId },
|
||||
stream: "lifecycle",
|
||||
data: { phase: "start", startedAt },
|
||||
});
|
||||
|
||||
let text = "pong";
|
||||
@@ -60,12 +60,11 @@ describe("sessions_send gateway loopback", () => {
|
||||
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "job",
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
state: "done",
|
||||
phase: "end",
|
||||
startedAt,
|
||||
endedAt: Date.now(),
|
||||
sessionId,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@@ -121,13 +121,9 @@ export function summarizeAgentEventForWsLog(
|
||||
return extra;
|
||||
}
|
||||
|
||||
if (stream === "job") {
|
||||
const state = typeof data.state === "string" ? data.state : undefined;
|
||||
if (state) extra.state = state;
|
||||
if (data.to === null) extra.to = null;
|
||||
else if (typeof data.to === "string") extra.to = data.to;
|
||||
if (typeof data.durationMs === "number")
|
||||
extra.ms = Math.round(data.durationMs);
|
||||
if (stream === "lifecycle") {
|
||||
const phase = typeof data.phase === "string" ? data.phase : undefined;
|
||||
if (phase) extra.phase = phase;
|
||||
if (typeof data.aborted === "boolean") extra.aborted = data.aborted;
|
||||
const error = typeof data.error === "string" ? data.error : undefined;
|
||||
if (error?.trim()) extra.error = compactPreview(error, 120);
|
||||
|
||||
Reference in New Issue
Block a user