diff --git a/docs/session-tool.md b/docs/session-tool.md index f48b33fd7..d7a795d78 100644 --- a/docs/session-tool.md +++ b/docs/session-tool.md @@ -74,6 +74,7 @@ Behavior: - `timeoutSeconds > 0`: wait up to N seconds for completion, then return `{ runId, status: "ok", reply }`. - If wait times out: `{ runId, status: "timeout", error }`. Run continues; call `sessions_history` later. - If the run fails: `{ runId, status: "error", error }`. +- Waits via gateway `agent.wait` (server-side) so reconnects don't drop the wait. ## Provider Field - For groups, `provider` is the `surface` recorded on the session entry. diff --git a/src/agents/clawdis-tools.sessions.test.ts b/src/agents/clawdis-tools.sessions.test.ts index aad46270b..9fab95ac0 100644 --- a/src/agents/clawdis-tools.sessions.test.ts +++ b/src/agents/clawdis-tools.sessions.test.ts @@ -123,17 +123,29 @@ describe("sessions tools", () => { it("sessions_send supports fire-and-forget and wait", async () => { callGatewayMock.mockReset(); - const calls: Array<{ method?: string; expectFinal?: boolean }> = []; + const calls: Array<{ method?: string; params?: unknown }> = []; callGatewayMock.mockImplementation(async (opts: unknown) => { - const request = opts as { method?: string; expectFinal?: boolean }; + const request = opts as { method?: string; params?: unknown }; calls.push(request); if (request.method === "agent") { - return { runId: "run-1", status: "accepted" }; + return { runId: "run-1", status: "accepted", acceptedAt: 1234 }; + } + if (request.method === "agent.wait") { + return { runId: "run-1", status: "ok" }; } if (request.method === "chat.history") { return { messages: [ - { role: "assistant", content: [{ type: "text", text: "done" }] }, + { + role: "assistant", + content: [ + { + type: "text", + text: "done", + }, + ], + timestamp: 20, + }, ], }; } @@ -156,7 +168,7 @@ describe("sessions tools", () => { const waitPromise = tool.execute("call6", { sessionKey: "main", message: "wait", - timeoutSeconds: 5, + timeoutSeconds: 1, }); const waited = await waitPromise; expect(waited.details).toMatchObject({ @@ -166,7 +178,13 @@ describe("sessions tools", () => { }); const agentCalls = calls.filter((call) => call.method === "agent"); - expect(agentCalls[0]?.expectFinal).toBeUndefined(); - expect(agentCalls[1]?.expectFinal).toBe(true); + const waitCalls = calls.filter((call) => call.method === "agent.wait"); + const historyOnlyCalls = calls.filter( + (call) => call.method === "chat.history", + ); + expect(agentCalls).toHaveLength(2); + expect(waitCalls).toHaveLength(1); + expect(historyOnlyCalls).toHaveLength(1); + expect(waitCalls[0]?.params).toMatchObject({ afterMs: 1234 }); }); }); diff --git a/src/agents/clawdis-tools.ts b/src/agents/clawdis-tools.ts index 8afe39ee4..2ccebff40 100644 --- a/src/agents/clawdis-tools.ts +++ b/src/agents/clawdis-tools.ts @@ -1536,7 +1536,8 @@ function createNodesTool(): AnyAgentTool { const node = readStringParam(params, "node", { required: true }); const nodeId = await resolveNodeId(gatewayOpts, node); const maxAgeMs = - typeof params.maxAgeMs === "number" && Number.isFinite(params.maxAgeMs) + typeof params.maxAgeMs === "number" && + Number.isFinite(params.maxAgeMs) ? params.maxAgeMs : undefined; const desiredAccuracy = @@ -2736,7 +2737,7 @@ function createSessionsSendTool(): AnyAgentTool { ? Math.max(0, Math.floor(params.timeoutSeconds)) : 30; const idempotencyKey = crypto.randomUUID(); - let runId = idempotencyKey; + let runId: string = idempotencyKey; const displayKey = resolveDisplaySessionKey({ key: sessionKey, alias, @@ -2776,39 +2777,22 @@ function createSessionsSendTool(): AnyAgentTool { } } + let acceptedAt: number | undefined; try { const response = (await callGateway({ method: "agent", params: sendParams, - expectFinal: true, - timeoutMs: timeoutSeconds * 1000, - })) as { runId?: string; status?: string }; + timeoutMs: 10_000, + })) as { runId?: string; acceptedAt?: number }; if (typeof response?.runId === "string" && response.runId) { runId = response.runId; } + if (typeof response?.acceptedAt === "number") { + acceptedAt = response.acceptedAt; + } } catch (err) { const message = err instanceof Error ? err.message : String(err ?? "error"); - if (message.includes("gateway timeout")) { - try { - const cached = (await callGateway({ - method: "agent", - params: sendParams, - timeoutMs: 5_000, - })) as { runId?: string }; - if (typeof cached?.runId === "string" && cached.runId) { - runId = cached.runId; - } - } catch { - /* ignore */ - } - return jsonResult({ - runId, - status: "timeout", - error: message, - sessionKey: displayKey, - }); - } return jsonResult({ runId, status: "error", @@ -2817,6 +2801,49 @@ function createSessionsSendTool(): AnyAgentTool { }); } + const timeoutMs = timeoutSeconds * 1000; + let waitStatus: string | undefined; + let waitError: string | undefined; + try { + const wait = (await callGateway({ + method: "agent.wait", + params: { + runId, + afterMs: acceptedAt, + timeoutMs, + }, + timeoutMs: timeoutMs + 2000, + })) as { status?: string; error?: string }; + waitStatus = typeof wait?.status === "string" ? wait.status : undefined; + waitError = typeof wait?.error === "string" ? wait.error : undefined; + } catch (err) { + const message = + err instanceof Error ? err.message : String(err ?? "error"); + return jsonResult({ + runId, + status: message.includes("gateway timeout") ? "timeout" : "error", + error: message, + sessionKey: displayKey, + }); + } + + if (waitStatus === "timeout") { + return jsonResult({ + runId, + status: "timeout", + error: waitError, + sessionKey: displayKey, + }); + } + if (waitStatus === "error") { + return jsonResult({ + runId, + status: "error", + error: waitError ?? "agent error", + sessionKey: displayKey, + }); + } + const history = (await callGateway({ method: "chat.history", params: { sessionKey: resolvedKey, limit: 50 }, diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index aed0e6f39..72083d427 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -3,6 +3,7 @@ import { type AgentEvent, AgentEventSchema, AgentParamsSchema, + AgentWaitParamsSchema, type ChatAbortParams, ChatAbortParamsSchema, type ChatEvent, @@ -77,6 +78,7 @@ import { type ResponseFrame, ResponseFrameSchema, SendParamsSchema, + type AgentWaitParams, type SessionsCompactParams, SessionsCompactParamsSchema, type SessionsDeleteParams, @@ -146,6 +148,8 @@ export const validateResponseFrame = export const validateEventFrame = ajv.compile(EventFrameSchema); export const validateSendParams = ajv.compile(SendParamsSchema); export const validateAgentParams = ajv.compile(AgentParamsSchema); +export const validateAgentWaitParams = + ajv.compile(AgentWaitParamsSchema); export const validateWakeParams = ajv.compile(WakeParamsSchema); export const validateNodePairRequestParams = ajv.compile( NodePairRequestParamsSchema, @@ -340,6 +344,7 @@ export type { ErrorShape, StateVersion, AgentEvent, + AgentWaitParams, ChatEvent, TickEvent, ShutdownEvent, diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index e37e7ce72..cd1de8060 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -213,6 +213,15 @@ export const AgentParamsSchema = Type.Object( { additionalProperties: false }, ); +export const AgentWaitParamsSchema = Type.Object( + { + runId: NonEmptyString, + afterMs: Type.Optional(Type.Integer({ minimum: 0 })), + timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })), + }, + { additionalProperties: false }, +); + export const WakeParamsSchema = Type.Object( { mode: Type.Union([Type.Literal("now"), Type.Literal("next-heartbeat")]), @@ -818,6 +827,7 @@ export const ProtocolSchemas: Record = { AgentEvent: AgentEventSchema, SendParams: SendParamsSchema, AgentParams: AgentParamsSchema, + AgentWaitParams: AgentWaitParamsSchema, WakeParams: WakeParamsSchema, NodePairRequestParams: NodePairRequestParamsSchema, NodePairListParams: NodePairListParamsSchema, @@ -885,6 +895,7 @@ export type PresenceEntry = Static; export type ErrorShape = Static; export type StateVersion = Static; export type AgentEvent = Static; +export type AgentWaitParams = Static; export type WakeParams = Static; export type NodePairRequestParams = Static; export type NodePairListParams = Static; diff --git a/src/gateway/server-methods.ts b/src/gateway/server-methods.ts index f432ce440..1334bf9ee 100644 --- a/src/gateway/server-methods.ts +++ b/src/gateway/server-methods.ts @@ -3,12 +3,6 @@ import fs from "node:fs"; import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js"; import type { ModelCatalogEntry } from "../agents/model-catalog.js"; -import { - abortEmbeddedPiRun, - isEmbeddedPiRunActive, - resolveEmbeddedSessionLane, - waitForEmbeddedPiRunEnd, -} from "../agents/pi-embedded.js"; import { buildAllowedModelSet, buildModelAliasIndex, @@ -17,6 +11,12 @@ import { resolveModelRefFromString, resolveThinkingDefault, } from "../agents/model-selection.js"; +import { + abortEmbeddedPiRun, + isEmbeddedPiRunActive, + resolveEmbeddedSessionLane, + waitForEmbeddedPiRunEnd, +} from "../agents/pi-embedded.js"; import { installSkill } from "../agents/skills-install.js"; import { buildWorkspaceSkillStatus } from "../agents/skills-status.js"; import { DEFAULT_AGENT_WORKSPACE_DIR } from "../agents/workspace.js"; @@ -59,6 +59,7 @@ import { sendMessageIMessage } from "../imessage/index.js"; import { type IMessageProbe, probeIMessage } from "../imessage/probe.js"; import type { startNodeBridgeServer } from "../infra/bridge/server.js"; import { getLastHeartbeatEvent } from "../infra/heartbeat-events.js"; +import { onAgentEvent } from "../infra/agent-events.js"; import { setHeartbeatsEnabled } from "../infra/heartbeat-runner.js"; import { approveNodePairing, @@ -80,9 +81,9 @@ import { loadVoiceWakeConfig, setVoiceWakeTriggers, } from "../infra/voicewake.js"; +import { clearCommandLane } from "../process/command-queue.js"; import { webAuthExists } from "../providers/web/index.js"; import { defaultRuntime } from "../runtime.js"; -import { clearCommandLane } from "../process/command-queue.js"; import { normalizeSendPolicy, resolveSendPolicy, @@ -110,7 +111,9 @@ import { type SessionsListParams, type SessionsPatchParams, type SessionsResetParams, + type AgentWaitParams, validateAgentParams, + validateAgentWaitParams, validateChatAbortParams, validateChatHistoryParams, validateChatSendParams, @@ -189,6 +192,137 @@ type DedupeEntry = { error?: ErrorShape; }; +type AgentJobSnapshot = { + runId: string; + state: "done" | "error"; + startedAt?: number; + endedAt?: number; + error?: string; + ts: number; +}; + +const AGENT_JOB_CACHE_TTL_MS = 10 * 60_000; +const agentJobCache = new Map(); +const agentRunStarts = new Map(); +let agentJobListenerStarted = false; + +function pruneAgentJobCache(now = Date.now()) { + for (const [runId, entry] of agentJobCache) { + if (now - entry.ts > AGENT_JOB_CACHE_TTL_MS) { + agentJobCache.delete(runId); + } + } +} + +function recordAgentJobSnapshot(entry: AgentJobSnapshot) { + pruneAgentJobCache(entry.ts); + agentJobCache.set(entry.runId, entry); +} + +function ensureAgentJobListener() { + if (agentJobListenerStarted) return; + agentJobListenerStarted = true; + onAgentEvent((evt) => { + if (!evt || evt.stream !== "job") return; + const state = evt.data?.state; + if (state === "started") { + const startedAt = + typeof evt.data?.startedAt === "number" + ? (evt.data.startedAt as number) + : undefined; + if (startedAt !== undefined) { + agentRunStarts.set(evt.runId, startedAt); + } + return; + } + if (state !== "done" && state !== "error") return; + const startedAt = + typeof evt.data?.startedAt === "number" + ? (evt.data.startedAt as number) + : agentRunStarts.get(evt.runId); + const endedAt = + typeof evt.data?.endedAt === "number" + ? (evt.data.endedAt as number) + : undefined; + const error = + typeof evt.data?.error === "string" ? (evt.data.error as string) : undefined; + agentRunStarts.delete(evt.runId); + recordAgentJobSnapshot({ + runId: evt.runId, + state: state === "error" ? "error" : "done", + startedAt, + endedAt, + error, + ts: Date.now(), + }); + }); +} + +function matchesAfterMs(entry: AgentJobSnapshot, afterMs?: number) { + if (afterMs === undefined) return true; + if (typeof entry.startedAt === "number") return entry.startedAt >= afterMs; + if (typeof entry.endedAt === "number") return entry.endedAt >= afterMs; + return false; +} + +function getCachedAgentJob(runId: string, afterMs?: number) { + pruneAgentJobCache(); + const cached = agentJobCache.get(runId); + if (!cached) return undefined; + return matchesAfterMs(cached, afterMs) ? cached : undefined; +} + +async function waitForAgentJob(params: { + runId: string; + afterMs?: number; + timeoutMs: number; +}): Promise { + const { runId, afterMs, timeoutMs } = params; + ensureAgentJobListener(); + const cached = getCachedAgentJob(runId, afterMs); + if (cached) return cached; + if (timeoutMs <= 0) return null; + + return await new Promise((resolve) => { + let settled = false; + const finish = (entry: AgentJobSnapshot | null) => { + if (settled) return; + settled = true; + clearTimeout(timer); + unsubscribe(); + resolve(entry); + }; + const unsubscribe = onAgentEvent((evt) => { + if (!evt || evt.stream !== "job") return; + if (evt.runId !== runId) return; + const state = evt.data?.state; + if (state !== "done" && state !== "error") return; + const startedAt = + typeof evt.data?.startedAt === "number" + ? (evt.data.startedAt as number) + : agentRunStarts.get(evt.runId); + const endedAt = + typeof evt.data?.endedAt === "number" + ? (evt.data.endedAt as number) + : undefined; + const error = + typeof evt.data?.error === "string" ? (evt.data.error as string) : undefined; + const snapshot: AgentJobSnapshot = { + runId: evt.runId, + state: state === "error" ? "error" : "done", + startedAt, + endedAt, + error, + ts: Date.now(), + }; + recordAgentJobSnapshot(snapshot); + if (!matchesAfterMs(snapshot, afterMs)) return; + finish(snapshot); + }); + const timer = setTimeout(() => finish(null), Math.max(1, timeoutMs)); + }); +} + export type GatewayRequestContext = { deps: ReturnType; cron: CronService; @@ -2954,7 +3088,11 @@ export async function handleGatewayRequest( const deliver = params.deliver === true && resolvedChannel !== "webchat"; - const accepted = { runId, status: "accepted" as const }; + const accepted = { + runId, + status: "accepted" as const, + acceptedAt: Date.now(), + }; // Store an in-flight ack so retries do not spawn a second run. dedupe.set(`agent:${idem}`, { ts: Date.now(), @@ -3013,6 +3151,51 @@ export async function handleGatewayRequest( }); break; } + case "agent.wait": { + const params = (req.params ?? {}) as Record; + if (!validateAgentWaitParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid agent.wait params: ${formatValidationErrors(validateAgentWaitParams.errors)}`, + ), + ); + break; + } + const p = params as AgentWaitParams; + const runId = p.runId.trim(); + const afterMs = + typeof p.afterMs === "number" && Number.isFinite(p.afterMs) + ? Math.max(0, Math.floor(p.afterMs)) + : undefined; + const timeoutMs = + typeof p.timeoutMs === "number" && Number.isFinite(p.timeoutMs) + ? Math.max(0, Math.floor(p.timeoutMs)) + : 30_000; + + const snapshot = await waitForAgentJob({ + runId, + afterMs, + timeoutMs, + }); + if (!snapshot) { + respond(true, { + runId, + status: "timeout", + }); + break; + } + respond(true, { + runId, + status: snapshot.state === "done" ? "ok" : "error", + startedAt: snapshot.startedAt, + endedAt: snapshot.endedAt, + error: snapshot.error, + }); + break; + } default: { respond( false, diff --git a/src/gateway/server.agent.test.ts b/src/gateway/server.agent.test.ts index a7cbdc17f..a255f96b8 100644 --- a/src/gateway/server.agent.test.ts +++ b/src/gateway/server.agent.test.ts @@ -517,4 +517,80 @@ describe("gateway server agent", () => { ws.close(); await server.close(); }); + + test("agent.wait resolves after job completes", async () => { + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const waitP = rpcReq(ws, "agent.wait", { + runId: "run-wait-1", + afterMs: 100, + timeoutMs: 1000, + }); + + setTimeout(() => { + emitAgentEvent({ + runId: "run-wait-1", + stream: "job", + data: { state: "done", startedAt: 200, endedAt: 210 }, + }); + }, 10); + + const res = await waitP; + expect(res.ok).toBe(true); + expect(res.payload.status).toBe("ok"); + expect(res.payload.startedAt).toBe(200); + + ws.close(); + await server.close(); + }); + + test("agent.wait ignores jobs before afterMs", async () => { + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const waitP = rpcReq(ws, "agent.wait", { + runId: "run-wait-2", + afterMs: 500, + timeoutMs: 1000, + }); + + setTimeout(() => { + emitAgentEvent({ + runId: "run-wait-2", + stream: "job", + data: { state: "done", startedAt: 200, endedAt: 220 }, + }); + }, 10); + setTimeout(() => { + emitAgentEvent({ + runId: "run-wait-2", + stream: "job", + data: { state: "done", startedAt: 700, endedAt: 710 }, + }); + }, 20); + + const res = await waitP; + expect(res.ok).toBe(true); + expect(res.payload.status).toBe("ok"); + expect(res.payload.startedAt).toBe(700); + + ws.close(); + await server.close(); + }); + + test("agent.wait times out when no job completes", async () => { + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const res = await rpcReq(ws, "agent.wait", { + runId: "run-wait-3", + timeoutMs: 20, + }); + expect(res.ok).toBe(true); + expect(res.payload.status).toBe("timeout"); + + ws.close(); + await server.close(); + }); }); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 4349c96b5..89d0f408e 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -236,6 +236,7 @@ const METHODS = [ "system-event", "send", "agent", + "agent.wait", "web.login.start", "web.login.wait", "web.logout",