fix: wait on agent.wait for sessions_send

This commit is contained in:
Peter Steinberger
2026-01-04 01:15:23 +01:00
parent 412e8b3aee
commit e3c543ec06
8 changed files with 362 additions and 40 deletions

View File

@@ -3,6 +3,7 @@ import {
type AgentEvent,
AgentEventSchema,
AgentParamsSchema,
AgentWaitParamsSchema,
type ChatAbortParams,
ChatAbortParamsSchema,
type ChatEvent,
@@ -77,6 +78,7 @@ import {
type ResponseFrame,
ResponseFrameSchema,
SendParamsSchema,
type AgentWaitParams,
type SessionsCompactParams,
SessionsCompactParamsSchema,
type SessionsDeleteParams,
@@ -146,6 +148,8 @@ export const validateResponseFrame =
export const validateEventFrame = ajv.compile<EventFrame>(EventFrameSchema);
export const validateSendParams = ajv.compile(SendParamsSchema);
export const validateAgentParams = ajv.compile(AgentParamsSchema);
export const validateAgentWaitParams =
ajv.compile<AgentWaitParams>(AgentWaitParamsSchema);
export const validateWakeParams = ajv.compile<WakeParams>(WakeParamsSchema);
export const validateNodePairRequestParams = ajv.compile<NodePairRequestParams>(
NodePairRequestParamsSchema,
@@ -340,6 +344,7 @@ export type {
ErrorShape,
StateVersion,
AgentEvent,
AgentWaitParams,
ChatEvent,
TickEvent,
ShutdownEvent,

View File

@@ -213,6 +213,15 @@ export const AgentParamsSchema = Type.Object(
{ additionalProperties: false },
);
export const AgentWaitParamsSchema = Type.Object(
{
runId: NonEmptyString,
afterMs: Type.Optional(Type.Integer({ minimum: 0 })),
timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
export const WakeParamsSchema = Type.Object(
{
mode: Type.Union([Type.Literal("now"), Type.Literal("next-heartbeat")]),
@@ -818,6 +827,7 @@ export const ProtocolSchemas: Record<string, TSchema> = {
AgentEvent: AgentEventSchema,
SendParams: SendParamsSchema,
AgentParams: AgentParamsSchema,
AgentWaitParams: AgentWaitParamsSchema,
WakeParams: WakeParamsSchema,
NodePairRequestParams: NodePairRequestParamsSchema,
NodePairListParams: NodePairListParamsSchema,
@@ -885,6 +895,7 @@ export type PresenceEntry = Static<typeof PresenceEntrySchema>;
export type ErrorShape = Static<typeof ErrorShapeSchema>;
export type StateVersion = Static<typeof StateVersionSchema>;
export type AgentEvent = Static<typeof AgentEventSchema>;
export type AgentWaitParams = Static<typeof AgentWaitParamsSchema>;
export type WakeParams = Static<typeof WakeParamsSchema>;
export type NodePairRequestParams = Static<typeof NodePairRequestParamsSchema>;
export type NodePairListParams = Static<typeof NodePairListParamsSchema>;

View File

@@ -3,12 +3,6 @@ import fs from "node:fs";
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js";
import type { ModelCatalogEntry } from "../agents/model-catalog.js";
import {
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
resolveEmbeddedSessionLane,
waitForEmbeddedPiRunEnd,
} from "../agents/pi-embedded.js";
import {
buildAllowedModelSet,
buildModelAliasIndex,
@@ -17,6 +11,12 @@ import {
resolveModelRefFromString,
resolveThinkingDefault,
} from "../agents/model-selection.js";
import {
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
resolveEmbeddedSessionLane,
waitForEmbeddedPiRunEnd,
} from "../agents/pi-embedded.js";
import { installSkill } from "../agents/skills-install.js";
import { buildWorkspaceSkillStatus } from "../agents/skills-status.js";
import { DEFAULT_AGENT_WORKSPACE_DIR } from "../agents/workspace.js";
@@ -59,6 +59,7 @@ import { sendMessageIMessage } from "../imessage/index.js";
import { type IMessageProbe, probeIMessage } from "../imessage/probe.js";
import type { startNodeBridgeServer } from "../infra/bridge/server.js";
import { getLastHeartbeatEvent } from "../infra/heartbeat-events.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { setHeartbeatsEnabled } from "../infra/heartbeat-runner.js";
import {
approveNodePairing,
@@ -80,9 +81,9 @@ import {
loadVoiceWakeConfig,
setVoiceWakeTriggers,
} from "../infra/voicewake.js";
import { clearCommandLane } from "../process/command-queue.js";
import { webAuthExists } from "../providers/web/index.js";
import { defaultRuntime } from "../runtime.js";
import { clearCommandLane } from "../process/command-queue.js";
import {
normalizeSendPolicy,
resolveSendPolicy,
@@ -110,7 +111,9 @@ import {
type SessionsListParams,
type SessionsPatchParams,
type SessionsResetParams,
type AgentWaitParams,
validateAgentParams,
validateAgentWaitParams,
validateChatAbortParams,
validateChatHistoryParams,
validateChatSendParams,
@@ -189,6 +192,137 @@ type DedupeEntry = {
error?: ErrorShape;
};
type AgentJobSnapshot = {
runId: string;
state: "done" | "error";
startedAt?: number;
endedAt?: number;
error?: string;
ts: number;
};
const AGENT_JOB_CACHE_TTL_MS = 10 * 60_000;
const agentJobCache = new Map<string, AgentJobSnapshot>();
const agentRunStarts = new Map<string, number>();
let agentJobListenerStarted = false;
function pruneAgentJobCache(now = Date.now()) {
for (const [runId, entry] of agentJobCache) {
if (now - entry.ts > AGENT_JOB_CACHE_TTL_MS) {
agentJobCache.delete(runId);
}
}
}
function recordAgentJobSnapshot(entry: AgentJobSnapshot) {
pruneAgentJobCache(entry.ts);
agentJobCache.set(entry.runId, entry);
}
function ensureAgentJobListener() {
if (agentJobListenerStarted) return;
agentJobListenerStarted = true;
onAgentEvent((evt) => {
if (!evt || evt.stream !== "job") return;
const state = evt.data?.state;
if (state === "started") {
const startedAt =
typeof evt.data?.startedAt === "number"
? (evt.data.startedAt as number)
: undefined;
if (startedAt !== undefined) {
agentRunStarts.set(evt.runId, startedAt);
}
return;
}
if (state !== "done" && state !== "error") return;
const startedAt =
typeof evt.data?.startedAt === "number"
? (evt.data.startedAt as number)
: agentRunStarts.get(evt.runId);
const endedAt =
typeof evt.data?.endedAt === "number"
? (evt.data.endedAt as number)
: undefined;
const error =
typeof evt.data?.error === "string" ? (evt.data.error as string) : undefined;
agentRunStarts.delete(evt.runId);
recordAgentJobSnapshot({
runId: evt.runId,
state: state === "error" ? "error" : "done",
startedAt,
endedAt,
error,
ts: Date.now(),
});
});
}
function matchesAfterMs(entry: AgentJobSnapshot, afterMs?: number) {
if (afterMs === undefined) return true;
if (typeof entry.startedAt === "number") return entry.startedAt >= afterMs;
if (typeof entry.endedAt === "number") return entry.endedAt >= afterMs;
return false;
}
function getCachedAgentJob(runId: string, afterMs?: number) {
pruneAgentJobCache();
const cached = agentJobCache.get(runId);
if (!cached) return undefined;
return matchesAfterMs(cached, afterMs) ? cached : undefined;
}
async function waitForAgentJob(params: {
runId: string;
afterMs?: number;
timeoutMs: number;
}): Promise<AgentJobSnapshot | null> {
const { runId, afterMs, timeoutMs } = params;
ensureAgentJobListener();
const cached = getCachedAgentJob(runId, afterMs);
if (cached) return cached;
if (timeoutMs <= 0) return null;
return await new Promise((resolve) => {
let settled = false;
const finish = (entry: AgentJobSnapshot | null) => {
if (settled) return;
settled = true;
clearTimeout(timer);
unsubscribe();
resolve(entry);
};
const unsubscribe = onAgentEvent((evt) => {
if (!evt || evt.stream !== "job") return;
if (evt.runId !== runId) return;
const state = evt.data?.state;
if (state !== "done" && state !== "error") return;
const startedAt =
typeof evt.data?.startedAt === "number"
? (evt.data.startedAt as number)
: agentRunStarts.get(evt.runId);
const endedAt =
typeof evt.data?.endedAt === "number"
? (evt.data.endedAt as number)
: undefined;
const error =
typeof evt.data?.error === "string" ? (evt.data.error as string) : undefined;
const snapshot: AgentJobSnapshot = {
runId: evt.runId,
state: state === "error" ? "error" : "done",
startedAt,
endedAt,
error,
ts: Date.now(),
};
recordAgentJobSnapshot(snapshot);
if (!matchesAfterMs(snapshot, afterMs)) return;
finish(snapshot);
});
const timer = setTimeout(() => finish(null), Math.max(1, timeoutMs));
});
}
export type GatewayRequestContext = {
deps: ReturnType<typeof createDefaultDeps>;
cron: CronService;
@@ -2954,7 +3088,11 @@ export async function handleGatewayRequest(
const deliver = params.deliver === true && resolvedChannel !== "webchat";
const accepted = { runId, status: "accepted" as const };
const accepted = {
runId,
status: "accepted" as const,
acceptedAt: Date.now(),
};
// Store an in-flight ack so retries do not spawn a second run.
dedupe.set(`agent:${idem}`, {
ts: Date.now(),
@@ -3013,6 +3151,51 @@ export async function handleGatewayRequest(
});
break;
}
case "agent.wait": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateAgentWaitParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid agent.wait params: ${formatValidationErrors(validateAgentWaitParams.errors)}`,
),
);
break;
}
const p = params as AgentWaitParams;
const runId = p.runId.trim();
const afterMs =
typeof p.afterMs === "number" && Number.isFinite(p.afterMs)
? Math.max(0, Math.floor(p.afterMs))
: undefined;
const timeoutMs =
typeof p.timeoutMs === "number" && Number.isFinite(p.timeoutMs)
? Math.max(0, Math.floor(p.timeoutMs))
: 30_000;
const snapshot = await waitForAgentJob({
runId,
afterMs,
timeoutMs,
});
if (!snapshot) {
respond(true, {
runId,
status: "timeout",
});
break;
}
respond(true, {
runId,
status: snapshot.state === "done" ? "ok" : "error",
startedAt: snapshot.startedAt,
endedAt: snapshot.endedAt,
error: snapshot.error,
});
break;
}
default: {
respond(
false,

View File

@@ -517,4 +517,80 @@ describe("gateway server agent", () => {
ws.close();
await server.close();
});
test("agent.wait resolves after job completes", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const waitP = rpcReq(ws, "agent.wait", {
runId: "run-wait-1",
afterMs: 100,
timeoutMs: 1000,
});
setTimeout(() => {
emitAgentEvent({
runId: "run-wait-1",
stream: "job",
data: { state: "done", startedAt: 200, endedAt: 210 },
});
}, 10);
const res = await waitP;
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("ok");
expect(res.payload.startedAt).toBe(200);
ws.close();
await server.close();
});
test("agent.wait ignores jobs before afterMs", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const waitP = rpcReq(ws, "agent.wait", {
runId: "run-wait-2",
afterMs: 500,
timeoutMs: 1000,
});
setTimeout(() => {
emitAgentEvent({
runId: "run-wait-2",
stream: "job",
data: { state: "done", startedAt: 200, endedAt: 220 },
});
}, 10);
setTimeout(() => {
emitAgentEvent({
runId: "run-wait-2",
stream: "job",
data: { state: "done", startedAt: 700, endedAt: 710 },
});
}, 20);
const res = await waitP;
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("ok");
expect(res.payload.startedAt).toBe(700);
ws.close();
await server.close();
});
test("agent.wait times out when no job completes", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent.wait", {
runId: "run-wait-3",
timeoutMs: 20,
});
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("timeout");
ws.close();
await server.close();
});
});

View File

@@ -236,6 +236,7 @@ const METHODS = [
"system-event",
"send",
"agent",
"agent.wait",
"web.login.start",
"web.login.wait",
"web.logout",