fix(gateway): ack agent requests immediately
This commit is contained in:
@@ -481,7 +481,7 @@ describe("gateway server", () => {
|
|||||||
await server.close();
|
await server.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
test("agent ack event then final response", { timeout: 8000 }, async () => {
|
test("agent ack response then final response", { timeout: 8000 }, async () => {
|
||||||
const { server, ws } = await startServerWithClient();
|
const { server, ws } = await startServerWithClient();
|
||||||
ws.send(
|
ws.send(
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
@@ -502,11 +502,13 @@ describe("gateway server", () => {
|
|||||||
const ackP = onceMessage(
|
const ackP = onceMessage(
|
||||||
ws,
|
ws,
|
||||||
(o) =>
|
(o) =>
|
||||||
o.type === "event" &&
|
o.type === "res" && o.id === "ag1" && o.payload?.status === "accepted",
|
||||||
o.event === "agent" &&
|
);
|
||||||
o.payload?.status === "accepted",
|
const finalP = onceMessage(
|
||||||
|
ws,
|
||||||
|
(o) =>
|
||||||
|
o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted",
|
||||||
);
|
);
|
||||||
const finalP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag1");
|
|
||||||
ws.send(
|
ws.send(
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
type: "req",
|
type: "req",
|
||||||
@@ -732,7 +734,8 @@ describe("gateway server", () => {
|
|||||||
const ws1 = await dial();
|
const ws1 = await dial();
|
||||||
const final1P = onceMessage(
|
const final1P = onceMessage(
|
||||||
ws1,
|
ws1,
|
||||||
(o) => o.type === "res" && o.id === "ag1",
|
(o) =>
|
||||||
|
o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted",
|
||||||
6000,
|
6000,
|
||||||
);
|
);
|
||||||
ws1.send(
|
ws1.send(
|
||||||
@@ -749,7 +752,8 @@ describe("gateway server", () => {
|
|||||||
const ws2 = await dial();
|
const ws2 = await dial();
|
||||||
const final2P = onceMessage(
|
const final2P = onceMessage(
|
||||||
ws2,
|
ws2,
|
||||||
(o) => o.type === "res" && o.id === "ag2",
|
(o) =>
|
||||||
|
o.type === "res" && o.id === "ag2" && o.payload?.status !== "accepted",
|
||||||
6000,
|
6000,
|
||||||
);
|
);
|
||||||
ws2.send(
|
ws2.send(
|
||||||
|
|||||||
@@ -1208,65 +1208,64 @@ export async function startGatewayServer(
|
|||||||
|
|
||||||
const deliver =
|
const deliver =
|
||||||
params.deliver === true && resolvedChannel !== "webchat";
|
params.deliver === true && resolvedChannel !== "webchat";
|
||||||
// Acknowledge via event to avoid double res frames
|
|
||||||
const ackEvent = {
|
const accepted = { runId, status: "accepted" as const };
|
||||||
type: "event",
|
// Store an in-flight ack so retries do not spawn a second run.
|
||||||
event: "agent",
|
dedupe.set(`agent:${idem}`, {
|
||||||
payload: { runId, status: "accepted" as const },
|
ts: Date.now(),
|
||||||
seq: ++seq,
|
ok: true,
|
||||||
};
|
payload: accepted,
|
||||||
socket.send(JSON.stringify(ackEvent));
|
|
||||||
logWs("out", "event", {
|
|
||||||
connId,
|
|
||||||
event: "agent",
|
|
||||||
runId,
|
|
||||||
status: "accepted",
|
|
||||||
});
|
});
|
||||||
try {
|
respond(true, accepted, undefined, { runId });
|
||||||
await agentCommand(
|
|
||||||
{
|
void agentCommand(
|
||||||
message,
|
{
|
||||||
to: resolvedTo,
|
message,
|
||||||
sessionId: resolvedSessionId,
|
to: resolvedTo,
|
||||||
thinking: params.thinking,
|
sessionId: resolvedSessionId,
|
||||||
deliver,
|
thinking: params.thinking,
|
||||||
provider: resolvedChannel,
|
deliver,
|
||||||
timeout: params.timeout?.toString(),
|
provider: resolvedChannel,
|
||||||
bestEffortDeliver,
|
timeout: params.timeout?.toString(),
|
||||||
surface: "VoiceWake",
|
bestEffortDeliver,
|
||||||
},
|
surface: "VoiceWake",
|
||||||
defaultRuntime,
|
},
|
||||||
deps,
|
defaultRuntime,
|
||||||
);
|
deps,
|
||||||
const payload = {
|
)
|
||||||
runId,
|
.then(() => {
|
||||||
status: "ok" as const,
|
const payload = {
|
||||||
summary: "completed",
|
runId,
|
||||||
};
|
status: "ok" as const,
|
||||||
dedupe.set(`agent:${idem}`, {
|
summary: "completed",
|
||||||
ts: Date.now(),
|
};
|
||||||
ok: true,
|
dedupe.set(`agent:${idem}`, {
|
||||||
payload,
|
ts: Date.now(),
|
||||||
|
ok: true,
|
||||||
|
payload,
|
||||||
|
});
|
||||||
|
// Send a second res frame (same id) so TS clients with expectFinal can wait.
|
||||||
|
// Swift clients will typically treat the first res as the result and ignore this.
|
||||||
|
respond(true, payload, undefined, { runId });
|
||||||
|
})
|
||||||
|
.catch((err) => {
|
||||||
|
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
|
||||||
|
const payload = {
|
||||||
|
runId,
|
||||||
|
status: "error" as const,
|
||||||
|
summary: String(err),
|
||||||
|
};
|
||||||
|
dedupe.set(`agent:${idem}`, {
|
||||||
|
ts: Date.now(),
|
||||||
|
ok: false,
|
||||||
|
payload,
|
||||||
|
error,
|
||||||
|
});
|
||||||
|
respond(false, payload, error, {
|
||||||
|
runId,
|
||||||
|
error: formatForLog(err),
|
||||||
|
});
|
||||||
});
|
});
|
||||||
respond(true, payload, undefined, { runId });
|
|
||||||
} catch (err) {
|
|
||||||
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
|
|
||||||
const payload = {
|
|
||||||
runId,
|
|
||||||
status: "error" as const,
|
|
||||||
summary: String(err),
|
|
||||||
};
|
|
||||||
dedupe.set(`agent:${idem}`, {
|
|
||||||
ts: Date.now(),
|
|
||||||
ok: false,
|
|
||||||
payload,
|
|
||||||
error,
|
|
||||||
});
|
|
||||||
respond(false, payload, error, {
|
|
||||||
runId,
|
|
||||||
error: formatForLog(err),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
|
|||||||
Reference in New Issue
Block a user