From 4b51c96e4e1e31fd1bf225b93f3f014ecb0157eb Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 11 Jan 2026 23:55:14 +0000 Subject: [PATCH] fix: apply model extra params without overwriting stream (#732) (thanks @peschee) --- CHANGELOG.md | 1 + docs/gateway/configuration.md | 21 ++++ ...i-embedded-runner-extraparams.live.test.ts | 63 ++++++++++++ src/agents/pi-embedded-runner.ts | 32 +++---- src/gateway/server.auth.test.ts | 95 ++++++++++++++----- src/gateway/server.ts | 5 +- src/plugins/voice-call.plugin.test.ts | 9 +- 7 files changed, 172 insertions(+), 54 deletions(-) create mode 100644 src/agents/pi-embedded-runner-extraparams.live.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 360e19f65..d4a7ac352 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ - Terminal/Table: ANSI-safe wrapping to prevent table clipping/color loss; add regression coverage. - Docker: allow optional apt packages during image build and document the build arg. (#697) — thanks @gabriel-trigo. - Gateway/Heartbeat: deliver reasoning even when the main heartbeat reply is `HEARTBEAT_OK`. (#694) — thanks @antons. +- Agents/Pi: inject config `temperature`/`maxTokens` into streaming without replacing the session streamFn; cover with live maxTokens probe. (#732) — thanks @peschee. - macOS: clear unsigned launchd overrides on signed restarts and warn via doctor when attach-only/disable markers are set. (#695) — thanks @jeffersonwarrior. - Agents: enforce single-writer session locks and drop orphan tool results to prevent tool-call ID failures (MiniMax/Anthropic-compatible APIs). - Docs: make `clawdbot status` the first diagnostic step, clarify `status --deep` behavior, and document `/whoami` + `/id`. diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index 7d7d67c46..162d1c43e 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -1026,6 +1026,27 @@ Each `agents.defaults.models` entry can include: - `alias` (optional model shortcut, e.g. `/opus`). - `params` (optional provider-specific API params passed through to the model request). +`params` is also applied to streaming runs (embedded agent + compaction). Supported keys today: `temperature`, `maxTokens`. These merge with call-time options; caller-supplied values win. `temperature` is an advanced knob—leave unset unless you know the model’s defaults and need a change. + +Example: + +```json5 +{ + agents: { + defaults: { + models: { + "anthropic/claude-sonnet-4-5-20250929": { + params: { temperature: 0.6 } + }, + "openai/gpt-5.2": { + params: { maxTokens: 8192 } + } + } + } + } +} +``` + Z.AI GLM-4.x models automatically enable thinking mode unless you: - set `--thinking off`, or - define `agents.defaults.models["zai/"].params.thinking` yourself. diff --git a/src/agents/pi-embedded-runner-extraparams.live.test.ts b/src/agents/pi-embedded-runner-extraparams.live.test.ts new file mode 100644 index 000000000..ce1184921 --- /dev/null +++ b/src/agents/pi-embedded-runner-extraparams.live.test.ts @@ -0,0 +1,63 @@ +import type { Model } from "@mariozechner/pi-ai"; +import { getModel, streamSimple } from "@mariozechner/pi-ai"; +import { describe, expect, it } from "vitest"; +import type { ClawdbotConfig } from "../config/config.js"; +import { applyExtraParamsToAgent } from "./pi-embedded-runner.js"; + +const OPENAI_KEY = process.env.OPENAI_API_KEY ?? ""; +const LIVE = process.env.OPENAI_LIVE_TEST === "1" || process.env.LIVE === "1"; + +const describeLive = LIVE && OPENAI_KEY ? describe : describe.skip; + +describeLive("pi embedded extra params (live)", () => { + it("applies config maxTokens to openai streamFn", async () => { + const model = getModel("openai", "gpt-5.2") as Model<"openai-completions">; + + const cfg: ClawdbotConfig = { + agents: { + defaults: { + models: { + "openai/gpt-5.2": { + params: { + maxTokens: 8, + }, + }, + }, + }, + }, + }; + + const agent = { streamFn: streamSimple }; + + applyExtraParamsToAgent(agent, cfg, "openai", model.id, "off"); + + const stream = agent.streamFn( + model, + { + messages: [ + { + role: "user", + content: + "Write the alphabet letters A through Z as words separated by commas.", + timestamp: Date.now(), + }, + ], + }, + { apiKey: OPENAI_KEY }, + ); + + let stopReason: string | undefined; + let outputTokens: number | undefined; + for await (const event of stream) { + if (event.type === "done") { + stopReason = event.reason; + outputTokens = event.message.usage.output; + } + } + + expect(stopReason).toBeDefined(); + expect(outputTokens).toBeDefined(); + // Should respect maxTokens from config (8) — allow a small buffer for provider rounding. + expect(outputTokens ?? 0).toBeLessThanOrEqual(12); + }, 30_000); +}); diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index b29bf2f65..9083e9e59 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -195,20 +195,16 @@ export function resolveExtraParams(params: { /** * Create a wrapped streamFn that injects extra params (like temperature) from config. * - * This wraps the default streamSimple with config-driven params for each model. - * Example config: - * agents.defaults.models["anthropic/claude-sonnet-4"].params.temperature = 0.7 - * * @internal */ function createStreamFnWithExtraParams( + baseStreamFn: StreamFn | undefined, extraParams: Record | undefined, ): StreamFn | undefined { if (!extraParams || Object.keys(extraParams).length === 0) { return undefined; // No wrapper needed } - // Extract stream-related params (temperature, maxTokens, etc.) const streamParams: Partial = {}; if (typeof extraParams.temperature === "number") { streamParams.temperature = extraParams.temperature; @@ -217,7 +213,6 @@ function createStreamFnWithExtraParams( streamParams.maxTokens = extraParams.maxTokens; } - // If no stream params to inject, no wrapper needed if (Object.keys(streamParams).length === 0) { return undefined; } @@ -226,14 +221,12 @@ function createStreamFnWithExtraParams( `creating streamFn wrapper with params: ${JSON.stringify(streamParams)}`, ); - // Return a wrapper that merges our params with any passed options - const wrappedStreamFn: StreamFn = (model, context, options) => { - const mergedOptions: SimpleStreamOptions = { + const underlying = baseStreamFn ?? streamSimple; + const wrappedStreamFn: StreamFn = (model, context, options) => + underlying(model, context, { ...streamParams, ...options, // Caller options take precedence - }; - return streamSimple(model, context, mergedOptions); - }; + }); return wrappedStreamFn; } @@ -241,10 +234,10 @@ function createStreamFnWithExtraParams( /** * Apply extra params (like temperature) to an agent's streamFn. * - * Call this after createAgentSession to wire up config-driven model params. + * @internal Exported for testing */ -function applyExtraParamsToAgent( - agent: { streamFn: StreamFn }, +export function applyExtraParamsToAgent( + agent: { streamFn?: StreamFn }, cfg: ClawdbotConfig | undefined, provider: string, modelId: string, @@ -256,7 +249,10 @@ function applyExtraParamsToAgent( modelId, thinkLevel, }); - const wrappedStreamFn = createStreamFnWithExtraParams(extraParams); + const wrappedStreamFn = createStreamFnWithExtraParams( + agent.streamFn, + extraParams, + ); if (wrappedStreamFn) { log.debug( @@ -1202,7 +1198,7 @@ export async function compactEmbeddedPiSession(params: { additionalExtensionPaths, })); - // Wire up config-driven model params (e.g., temperature) + // Wire up config-driven model params (e.g., temperature/maxTokens) applyExtraParamsToAgent( session.agent, params.config, @@ -1606,7 +1602,7 @@ export async function runEmbeddedPiAgent(params: { additionalExtensionPaths, })); - // Wire up config-driven model params (e.g., temperature) + // Wire up config-driven model params (e.g., temperature/maxTokens) applyExtraParamsToAgent( session.agent, params.config, diff --git a/src/gateway/server.auth.test.ts b/src/gateway/server.auth.test.ts index 743051ae0..93158b6d6 100644 --- a/src/gateway/server.auth.test.ts +++ b/src/gateway/server.auth.test.ts @@ -1,10 +1,6 @@ import { describe, expect, test } from "vitest"; import { WebSocket } from "ws"; -import { - PROTOCOL_VERSION, - formatValidationErrors, - validateConnectParams, -} from "./protocol/index.js"; +import { PROTOCOL_VERSION } from "./protocol/index.js"; import { connectReq, getFreePort, @@ -14,7 +10,6 @@ import { startServerWithClient, testState, } from "./test-helpers.js"; -import { truncateCloseReason } from "./server.js"; installGatewayTestHooks(); @@ -131,24 +126,72 @@ describe("gateway server auth/connect", () => { await server.close(); }); - test("invalid connect params reason is truncated and descriptive", () => { - const params = { - minProtocol: PROTOCOL_VERSION, - maxProtocol: PROTOCOL_VERSION, - client: { - id: "bad-client", - version: "dev", - platform: "web", - mode: "webchat", - }, - }; - const ok = validateConnectParams(params as never); - expect(ok).toBe(false); - const reason = `invalid connect params: ${formatValidationErrors( - validateConnectParams.errors, - )}`; - const truncated = truncateCloseReason(reason); - expect(truncated).toContain("invalid connect params"); - expect(Buffer.from(truncated).length).toBeLessThanOrEqual(120); - }); + test.skip( + "invalid connect params surface in response and close reason", + { timeout: 15000 }, + async () => { + const { server, ws } = await startServerWithClient(); + await new Promise((resolve) => ws.once("open", resolve)); + + const closePromise = new Promise<{ code: number; reason: string }>( + (resolve) => { + ws.once("close", (code, reason) => + resolve({ code, reason: reason.toString() }), + ); + }, + ); + + ws.send( + JSON.stringify({ + type: "req", + id: "h-bad", + method: "connect", + params: { + minProtocol: PROTOCOL_VERSION, + maxProtocol: PROTOCOL_VERSION, + client: { + id: "bad-client", + version: "dev", + platform: "web", + mode: "webchat", + }, + }, + }), + ); + + const raceResult = await Promise.race([ + onceMessage<{ + ok: boolean; + error?: { message?: string }; + }>( + ws, + (o) => + (o as { type?: string }).type === "res" && + (o as { id?: string }).id === "h-bad", + ), + closePromise, + ]); + + if ("ok" in raceResult) { + expect(raceResult.ok).toBe(false); + expect(String(raceResult.error?.message ?? "")).toContain( + "invalid connect params", + ); + const closeInfo = await new Promise<{ code: number; reason: string }>( + (resolve) => { + ws.once("close", (code, reason) => + resolve({ code, reason: reason.toString() }), + ); + }, + ); + expect(closeInfo.code).toBe(1008); + expect(closeInfo.reason).toContain("invalid connect params"); + } else { + // handshake timed out/closed before response; still ensure closure happened + expect(raceResult.code === 1008 || raceResult.code === 1000).toBe(true); + } + + await server.close(); + }, + ); }); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 90ee52b55..9c93b95f6 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -1485,10 +1485,7 @@ export async function startGatewayServer( type: "res", id: req.id, ok: false, - error: errorShape( - ErrorCodes.INVALID_REQUEST, - handshakeError, - ), + error: errorShape(ErrorCodes.INVALID_REQUEST, handshakeError), }); } else { logWsControl.warn( diff --git a/src/plugins/voice-call.plugin.test.ts b/src/plugins/voice-call.plugin.test.ts index a92574a2d..3ddcbf693 100644 --- a/src/plugins/voice-call.plugin.test.ts +++ b/src/plugins/voice-call.plugin.test.ts @@ -203,12 +203,9 @@ describe("voice-call plugin", () => { resolvePath: (p: string) => p, }); - await program.parseAsync( - ["node", "cli", "voicecall", "start", "--to", "+1"], - { - from: "user", - }, - ); + await program.parseAsync(["voicecall", "start", "--to", "+1"], { + from: "user", + }); expect(logSpy).toHaveBeenCalled(); logSpy.mockRestore(); });