feat: multi-agent routing + multi-account providers

This commit is contained in:
Peter Steinberger
2026-01-06 18:25:37 +00:00
parent 50d4b17417
commit dbfa316d19
129 changed files with 3760 additions and 1126 deletions

View File

@@ -18,7 +18,7 @@ export type HookMappingResolved = {
messageTemplate?: string;
textTemplate?: string;
deliver?: boolean;
channel?:
provider?:
| "last"
| "whatsapp"
| "telegram"
@@ -57,7 +57,7 @@ export type HookAction =
wakeMode: "now" | "next-heartbeat";
sessionKey?: string;
deliver?: boolean;
channel?:
provider?:
| "last"
| "whatsapp"
| "telegram"
@@ -101,7 +101,7 @@ type HookTransformResult = Partial<{
name: string;
sessionKey: string;
deliver: boolean;
channel:
provider:
| "last"
| "whatsapp"
| "telegram"
@@ -196,7 +196,7 @@ function normalizeHookMapping(
messageTemplate: mapping.messageTemplate,
textTemplate: mapping.textTemplate,
deliver: mapping.deliver,
channel: mapping.channel,
provider: mapping.provider,
to: mapping.to,
thinking: mapping.thinking,
timeoutSeconds: mapping.timeoutSeconds,
@@ -241,7 +241,7 @@ function buildActionFromMapping(
wakeMode: mapping.wakeMode ?? "now",
sessionKey: renderOptional(mapping.sessionKey, ctx),
deliver: mapping.deliver,
channel: mapping.channel,
provider: mapping.provider,
to: renderOptional(mapping.to, ctx),
thinking: renderOptional(mapping.thinking, ctx),
timeoutSeconds: mapping.timeoutSeconds,
@@ -291,7 +291,7 @@ function mergeAction(
typeof override.deliver === "boolean"
? override.deliver
: baseAgent?.deliver,
channel: override.channel ?? baseAgent?.channel,
provider: override.provider ?? baseAgent?.provider,
to: override.to ?? baseAgent?.to,
thinking: override.thinking ?? baseAgent?.thinking,
timeoutSeconds: override.timeoutSeconds ?? baseAgent?.timeoutSeconds,

View File

@@ -56,7 +56,7 @@ describe("gateway hooks helpers", () => {
expect(normalizeWakePayload({ text: " ", mode: "now" }).ok).toBe(false);
});
test("normalizeAgentPayload defaults + validates channel", () => {
test("normalizeAgentPayload defaults + validates provider", () => {
const ok = normalizeAgentPayload(
{ message: "hello" },
{ idFactory: () => "fixed" },
@@ -64,20 +64,20 @@ describe("gateway hooks helpers", () => {
expect(ok.ok).toBe(true);
if (ok.ok) {
expect(ok.value.sessionKey).toBe("hook:fixed");
expect(ok.value.channel).toBe("last");
expect(ok.value.provider).toBe("last");
expect(ok.value.name).toBe("Hook");
}
const imsg = normalizeAgentPayload(
{ message: "yo", channel: "imsg" },
{ message: "yo", provider: "imsg" },
{ idFactory: () => "x" },
);
expect(imsg.ok).toBe(true);
if (imsg.ok) {
expect(imsg.value.channel).toBe("imessage");
expect(imsg.value.provider).toBe("imessage");
}
const bad = normalizeAgentPayload({ message: "yo", channel: "sms" });
const bad = normalizeAgentPayload({ message: "yo", provider: "sms" });
expect(bad.ok).toBe(false);
});
});

View File

@@ -137,7 +137,7 @@ export type HookAgentPayload = {
wakeMode: "now" | "next-heartbeat";
sessionKey: string;
deliver: boolean;
channel:
provider:
| "last"
| "whatsapp"
| "telegram"
@@ -173,26 +173,26 @@ export function normalizeAgentPayload(
typeof sessionKeyRaw === "string" && sessionKeyRaw.trim()
? sessionKeyRaw.trim()
: `hook:${idFactory()}`;
const channelRaw = payload.channel;
const channel =
channelRaw === "whatsapp" ||
channelRaw === "telegram" ||
channelRaw === "discord" ||
channelRaw === "slack" ||
channelRaw === "signal" ||
channelRaw === "imessage" ||
channelRaw === "last"
? channelRaw
: channelRaw === "imsg"
const providerRaw = payload.provider;
const provider =
providerRaw === "whatsapp" ||
providerRaw === "telegram" ||
providerRaw === "discord" ||
providerRaw === "slack" ||
providerRaw === "signal" ||
providerRaw === "imessage" ||
providerRaw === "last"
? providerRaw
: providerRaw === "imsg"
? "imessage"
: channelRaw === undefined
: providerRaw === undefined
? "last"
: null;
if (channel === null) {
if (provider === null) {
return {
ok: false,
error:
"channel must be last|whatsapp|telegram|discord|slack|signal|imessage",
"provider must be last|whatsapp|telegram|discord|slack|signal|imessage",
};
}
const toRaw = payload.to;
@@ -219,7 +219,7 @@ export function normalizeAgentPayload(
wakeMode,
sessionKey,
deliver,
channel,
provider,
to,
thinking,
timeoutSeconds,

View File

@@ -193,6 +193,7 @@ export const SendParamsSchema = Type.Object(
mediaUrl: Type.Optional(Type.String()),
gifPlayback: Type.Optional(Type.Boolean()),
provider: Type.Optional(Type.String()),
accountId: Type.Optional(Type.String()),
idempotencyKey: NonEmptyString,
},
{ additionalProperties: false },
@@ -206,6 +207,7 @@ export const PollParamsSchema = Type.Object(
maxSelections: Type.Optional(Type.Integer({ minimum: 1, maximum: 12 })),
durationHours: Type.Optional(Type.Integer({ minimum: 1 })),
provider: Type.Optional(Type.String()),
accountId: Type.Optional(Type.String()),
idempotencyKey: NonEmptyString,
},
{ additionalProperties: false },
@@ -218,7 +220,7 @@ export const AgentParamsSchema = Type.Object(
sessionKey: Type.Optional(Type.String()),
thinking: Type.Optional(Type.String()),
deliver: Type.Optional(Type.Boolean()),
channel: Type.Optional(Type.String()),
provider: Type.Optional(Type.String()),
timeout: Type.Optional(Type.Integer({ minimum: 0 })),
lane: Type.Optional(Type.String()),
extraSystemPrompt: Type.Optional(Type.String()),
@@ -543,6 +545,7 @@ export const WebLoginStartParamsSchema = Type.Object(
force: Type.Optional(Type.Boolean()),
timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })),
verbose: Type.Optional(Type.Boolean()),
accountId: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
@@ -550,6 +553,7 @@ export const WebLoginStartParamsSchema = Type.Object(
export const WebLoginWaitParamsSchema = Type.Object(
{
timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })),
accountId: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
@@ -642,7 +646,7 @@ export const CronPayloadSchema = Type.Union([
thinking: Type.Optional(Type.String()),
timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })),
deliver: Type.Optional(Type.Boolean()),
channel: Type.Optional(
provider: Type.Optional(
Type.Union([
Type.Literal("last"),
Type.Literal("whatsapp"),

View File

@@ -48,6 +48,7 @@ import {
setVoiceWakeTriggers,
} from "../infra/voicewake.js";
import { clearCommandLane } from "../process/command-queue.js";
import { isSubagentSessionKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import { normalizeSendPolicy } from "../sessions/send-policy.js";
import { buildMessageWithAttachments } from "./chat-attachments.js";
@@ -372,7 +373,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
},
};
}
if (!key.startsWith("subagent:")) {
if (!isSubagentSessionKey(key)) {
return {
ok: false,
error: {
@@ -606,11 +607,11 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
sendPolicy: entry?.sendPolicy,
displayName: entry?.displayName,
chatType: entry?.chatType,
surface: entry?.surface,
provider: entry?.provider,
subject: entry?.subject,
room: entry?.room,
space: entry?.space,
lastChannel: entry?.lastChannel,
lastProvider: entry?.lastProvider,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
};
@@ -986,7 +987,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
systemSent: entry?.systemSent,
lastChannel: entry?.lastChannel,
lastProvider: entry?.lastProvider,
lastTo: entry?.lastTo,
};
const clientRunId = p.idempotencyKey;
@@ -1033,7 +1034,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
thinking: p.thinking,
deliver: p.deliver,
timeout: Math.ceil(timeoutMs / 1000).toString(),
surface: `Node(${nodeId})`,
messageProvider: `node(${nodeId})`,
abortSignal: abortController.signal,
},
defaultRuntime,
@@ -1126,7 +1127,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
verboseLevel: entry?.verboseLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastProvider: entry?.lastProvider,
lastTo: entry?.lastTo,
};
if (storePath) {
@@ -1146,7 +1147,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
sessionId,
thinking: "low",
deliver: false,
surface: "Node",
messageProvider: "node",
},
defaultRuntime,
ctx.deps,
@@ -1208,7 +1209,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
verboseLevel: entry?.verboseLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastProvider: entry?.lastProvider,
lastTo: entry?.lastTo,
};
if (storePath) {
@@ -1227,7 +1228,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
typeof link?.timeoutSeconds === "number"
? link.timeoutSeconds.toString()
: undefined,
surface: "Node",
messageProvider: "node",
},
defaultRuntime,
ctx.deps,

View File

@@ -32,7 +32,7 @@ type HookDispatchers = {
wakeMode: "now" | "next-heartbeat";
sessionKey: string;
deliver: boolean;
channel:
provider:
| "last"
| "whatsapp"
| "telegram"
@@ -175,7 +175,7 @@ export function createHooksRequestHandler(
wakeMode: mapped.action.wakeMode,
sessionKey: mapped.action.sessionKey ?? "",
deliver: mapped.action.deliver === true,
channel: mapped.action.channel ?? "last",
provider: mapped.action.provider ?? "last",
to: mapped.action.to,
thinking: mapped.action.thinking,
timeoutSeconds: mapped.action.timeoutSeconds,

View File

@@ -2,7 +2,12 @@ import { randomUUID } from "node:crypto";
import { agentCommand } from "../../commands/agent.js";
import { loadConfig } from "../../config/config.js";
import { type SessionEntry, saveSessionStore } from "../../config/sessions.js";
import {
resolveAgentIdFromSessionKey,
resolveAgentMainSessionKey,
type SessionEntry,
saveSessionStore,
} from "../../config/sessions.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { defaultRuntime } from "../../runtime.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
@@ -41,7 +46,7 @@ export const agentHandlers: GatewayRequestHandlers = {
sessionKey?: string;
thinking?: string;
deliver?: boolean;
channel?: string;
provider?: string;
lane?: string;
extraSystemPrompt?: string;
idempotencyKey: string;
@@ -72,7 +77,7 @@ export const agentHandlers: GatewayRequestHandlers = {
cfgForAgent = cfg;
const now = Date.now();
const sessionId = entry?.sessionId ?? randomUUID();
sessionEntry = {
const nextEntry: SessionEntry = {
sessionId,
updatedAt: now,
thinkingLevel: entry?.thinkingLevel,
@@ -80,14 +85,15 @@ export const agentHandlers: GatewayRequestHandlers = {
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
skillsSnapshot: entry?.skillsSnapshot,
lastChannel: entry?.lastChannel,
lastProvider: entry?.lastProvider,
lastTo: entry?.lastTo,
};
sessionEntry = nextEntry;
const sendPolicy = resolveSendPolicy({
cfg,
entry,
sessionKey: requestedSessionKey,
surface: entry?.surface,
provider: entry?.provider,
chatType: entry?.chatType,
});
if (sendPolicy === "deny") {
@@ -102,14 +108,22 @@ export const agentHandlers: GatewayRequestHandlers = {
return;
}
if (store) {
store[requestedSessionKey] = sessionEntry;
store[requestedSessionKey] = nextEntry;
if (storePath) {
await saveSessionStore(storePath, store);
}
}
resolvedSessionId = sessionId;
const mainKey = (cfg.session?.mainKey ?? "main").trim() || "main";
if (requestedSessionKey === mainKey) {
const agentId = resolveAgentIdFromSessionKey(requestedSessionKey);
const mainSessionKey = resolveAgentMainSessionKey({
cfg,
agentId,
});
const rawMainKey = (cfg.session?.mainKey ?? "main").trim() || "main";
if (
requestedSessionKey === mainSessionKey ||
requestedSessionKey === rawMainKey
) {
context.addChatRun(idem, {
sessionKey: requestedSessionKey,
clientRunId: idem,
@@ -121,42 +135,42 @@ export const agentHandlers: GatewayRequestHandlers = {
const runId = idem;
const requestedChannelRaw =
typeof request.channel === "string" ? request.channel.trim() : "";
const requestedChannelNormalized = requestedChannelRaw
? requestedChannelRaw.toLowerCase()
const requestedProviderRaw =
typeof request.provider === "string" ? request.provider.trim() : "";
const requestedProviderNormalized = requestedProviderRaw
? requestedProviderRaw.toLowerCase()
: "last";
const requestedChannel =
requestedChannelNormalized === "imsg"
const requestedProvider =
requestedProviderNormalized === "imsg"
? "imessage"
: requestedChannelNormalized;
: requestedProviderNormalized;
const lastChannel = sessionEntry?.lastChannel;
const lastProvider = sessionEntry?.lastProvider;
const lastTo =
typeof sessionEntry?.lastTo === "string"
? sessionEntry.lastTo.trim()
: "";
const resolvedChannel = (() => {
if (requestedChannel === "last") {
const resolvedProvider = (() => {
if (requestedProvider === "last") {
// WebChat is not a deliverable surface. Treat it as "unset" for routing,
// so VoiceWake and CLI callers don't get stuck with deliver=false.
return lastChannel && lastChannel !== "webchat"
? lastChannel
return lastProvider && lastProvider !== "webchat"
? lastProvider
: "whatsapp";
}
if (
requestedChannel === "whatsapp" ||
requestedChannel === "telegram" ||
requestedChannel === "discord" ||
requestedChannel === "signal" ||
requestedChannel === "imessage" ||
requestedChannel === "webchat"
requestedProvider === "whatsapp" ||
requestedProvider === "telegram" ||
requestedProvider === "discord" ||
requestedProvider === "signal" ||
requestedProvider === "imessage" ||
requestedProvider === "webchat"
) {
return requestedChannel;
return requestedProvider;
}
return lastChannel && lastChannel !== "webchat"
? lastChannel
return lastProvider && lastProvider !== "webchat"
? lastProvider
: "whatsapp";
})();
@@ -167,11 +181,11 @@ export const agentHandlers: GatewayRequestHandlers = {
: undefined;
if (explicit) return explicit;
if (
resolvedChannel === "whatsapp" ||
resolvedChannel === "telegram" ||
resolvedChannel === "discord" ||
resolvedChannel === "signal" ||
resolvedChannel === "imessage"
resolvedProvider === "whatsapp" ||
resolvedProvider === "telegram" ||
resolvedProvider === "discord" ||
resolvedProvider === "signal" ||
resolvedProvider === "imessage"
) {
return lastTo || undefined;
}
@@ -182,7 +196,7 @@ export const agentHandlers: GatewayRequestHandlers = {
// If we derived a WhatsApp recipient from session "lastTo", ensure it is still valid
// for the configured allowlist. Otherwise, fall back to the first allowed number so
// voice wake doesn't silently route to stale/test recipients.
if (resolvedChannel !== "whatsapp") return resolvedTo;
if (resolvedProvider !== "whatsapp") return resolvedTo;
const explicit =
typeof request.to === "string" && request.to.trim()
? request.to.trim()
@@ -207,7 +221,7 @@ export const agentHandlers: GatewayRequestHandlers = {
return allowFrom[0];
})();
const deliver = request.deliver === true && resolvedChannel !== "webchat";
const deliver = request.deliver === true && resolvedProvider !== "webchat";
const accepted = {
runId,
@@ -229,10 +243,10 @@ export const agentHandlers: GatewayRequestHandlers = {
sessionId: resolvedSessionId,
thinking: request.thinking,
deliver,
provider: resolvedChannel,
provider: resolvedProvider,
timeout: request.timeout?.toString(),
bestEffortDeliver,
surface: "VoiceWake",
messageProvider: "voicewake",
runId,
lane: request.lane,
extraSystemPrompt: request.extraSystemPrompt,

View File

@@ -202,7 +202,7 @@ export const chatHandlers: GatewayRequestHandlers = {
verboseLevel: entry?.verboseLevel,
systemSent: entry?.systemSent,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastProvider: entry?.lastProvider,
lastTo: entry?.lastTo,
};
const clientRunId = p.idempotencyKey;
@@ -212,7 +212,7 @@ export const chatHandlers: GatewayRequestHandlers = {
cfg,
entry,
sessionKey: p.sessionKey,
surface: entry?.surface,
provider: entry?.provider,
chatType: entry?.chatType,
});
if (sendPolicy === "deny") {
@@ -262,7 +262,7 @@ export const chatHandlers: GatewayRequestHandlers = {
thinking: p.thinking,
deliver: p.deliver,
timeout: Math.ceil(timeoutMs / 1000).toString(),
surface: "WebChat",
messageProvider: "webchat",
abortSignal: abortController.signal,
},
defaultRuntime,

View File

@@ -6,7 +6,6 @@ import {
} from "../../config/config.js";
import { type DiscordProbe, probeDiscord } from "../../discord/probe.js";
import { type IMessageProbe, probeIMessage } from "../../imessage/probe.js";
import { webAuthExists } from "../../providers/web/index.js";
import { probeSignal, type SignalProbe } from "../../signal/probe.js";
import { probeSlack, type SlackProbe } from "../../slack/probe.js";
import {
@@ -15,7 +14,15 @@ import {
} from "../../slack/token.js";
import { probeTelegram, type TelegramProbe } from "../../telegram/probe.js";
import { resolveTelegramToken } from "../../telegram/token.js";
import { getWebAuthAgeMs, readWebSelfId } from "../../web/session.js";
import {
listEnabledWhatsAppAccounts,
resolveDefaultWhatsAppAccountId,
} from "../../web/accounts.js";
import {
getWebAuthAgeMs,
readWebSelfId,
webAuthExists,
} from "../../web/session.js";
import {
ErrorCodes,
errorShape,
@@ -148,10 +155,55 @@ export const providersHandlers: GatewayRequestHandlers = {
imessageLastProbeAt = Date.now();
}
const linked = await webAuthExists();
const authAgeMs = getWebAuthAgeMs();
const self = readWebSelfId();
const runtime = context.getRuntimeSnapshot();
const defaultWhatsAppAccountId = resolveDefaultWhatsAppAccountId(cfg);
const enabledWhatsAppAccounts = listEnabledWhatsAppAccounts(cfg);
const defaultWhatsAppAccount =
enabledWhatsAppAccounts.find(
(account) => account.accountId === defaultWhatsAppAccountId,
) ?? enabledWhatsAppAccounts[0];
const linked = defaultWhatsAppAccount
? await webAuthExists(defaultWhatsAppAccount.authDir)
: false;
const authAgeMs = defaultWhatsAppAccount
? getWebAuthAgeMs(defaultWhatsAppAccount.authDir)
: null;
const self = defaultWhatsAppAccount
? readWebSelfId(defaultWhatsAppAccount.authDir)
: { e164: null, jid: null };
const defaultWhatsAppStatus = {
running: false,
connected: false,
reconnectAttempts: 0,
lastConnectedAt: null,
lastDisconnect: null,
lastMessageAt: null,
lastEventAt: null,
lastError: null,
} as const;
const whatsappAccounts = await Promise.all(
enabledWhatsAppAccounts.map(async (account) => {
const rt =
runtime.whatsappAccounts?.[account.accountId] ??
defaultWhatsAppStatus;
return {
accountId: account.accountId,
enabled: account.enabled,
linked: await webAuthExists(account.authDir),
authAgeMs: getWebAuthAgeMs(account.authDir),
self: readWebSelfId(account.authDir),
running: rt.running,
connected: rt.connected,
lastConnectedAt: rt.lastConnectedAt ?? null,
lastDisconnect: rt.lastDisconnect ?? null,
reconnectAttempts: rt.reconnectAttempts,
lastMessageAt: rt.lastMessageAt ?? null,
lastEventAt: rt.lastEventAt ?? null,
lastError: rt.lastError ?? null,
};
}),
);
respond(
true,
@@ -171,6 +223,8 @@ export const providersHandlers: GatewayRequestHandlers = {
lastEventAt: runtime.whatsapp.lastEventAt ?? null,
lastError: runtime.whatsapp.lastError ?? null,
},
whatsappAccounts,
whatsappDefaultAccountId: defaultWhatsAppAccountId,
telegram: {
configured: telegramEnabled && Boolean(telegramToken),
tokenSource,

View File

@@ -6,6 +6,7 @@ import { sendMessageSignal } from "../../signal/index.js";
import { sendMessageSlack } from "../../slack/send.js";
import { sendMessageTelegram } from "../../telegram/send.js";
import { resolveTelegramToken } from "../../telegram/token.js";
import { resolveDefaultWhatsAppAccountId } from "../../web/accounts.js";
import { sendMessageWhatsApp, sendPollWhatsApp } from "../../web/outbound.js";
import {
ErrorCodes,
@@ -37,6 +38,7 @@ export const sendHandlers: GatewayRequestHandlers = {
mediaUrl?: string;
gifPlayback?: boolean;
provider?: string;
accountId?: string;
idempotencyKey: string;
};
const idem = request.idempotencyKey;
@@ -148,10 +150,17 @@ export const sendHandlers: GatewayRequestHandlers = {
});
respond(true, payload, undefined, { provider });
} else {
const cfg = loadConfig();
const accountId =
typeof request.accountId === "string" &&
request.accountId.trim().length > 0
? request.accountId.trim()
: resolveDefaultWhatsAppAccountId(cfg);
const result = await sendMessageWhatsApp(to, message, {
mediaUrl: request.mediaUrl,
verbose: shouldLogVerbose(),
gifPlayback: request.gifPlayback,
accountId,
});
const payload = {
runId: idem,
@@ -199,6 +208,7 @@ export const sendHandlers: GatewayRequestHandlers = {
maxSelections?: number;
durationHours?: number;
provider?: string;
accountId?: string;
idempotencyKey: string;
};
const idem = request.idempotencyKey;
@@ -245,8 +255,15 @@ export const sendHandlers: GatewayRequestHandlers = {
});
respond(true, payload, undefined, { provider });
} else {
const cfg = loadConfig();
const accountId =
typeof request.accountId === "string" &&
request.accountId.trim().length > 0
? request.accountId.trim()
: resolveDefaultWhatsAppAccountId(cfg);
const result = await sendPollWhatsApp(to, poll, {
verbose: shouldLogVerbose(),
accountId,
});
const payload = {
runId: idem,

View File

@@ -24,11 +24,11 @@ import { loadConfig } from "../../config/config.js";
import {
loadSessionStore,
resolveMainSessionKey,
resolveStorePath,
type SessionEntry,
saveSessionStore,
} from "../../config/sessions.js";
import { clearCommandLane } from "../../process/command-queue.js";
import { isSubagentSessionKey } from "../../routing/session-key.js";
import { normalizeSendPolicy } from "../../sessions/send-policy.js";
import {
ErrorCodes,
@@ -43,7 +43,8 @@ import {
import {
archiveFileOnDisk,
listSessionsFromStore,
loadSessionEntry,
loadCombinedSessionStoreForGateway,
resolveGatewaySessionStoreTarget,
resolveSessionTranscriptCandidates,
type SessionsPatchResult,
} from "../session-utils.js";
@@ -64,8 +65,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
const p = params as import("../protocol/index.js").SessionsListParams;
const cfg = loadConfig();
const storePath = resolveStorePath(cfg.session?.store);
const store = loadSessionStore(storePath);
const { storePath, store } = loadCombinedSessionStoreForGateway(cfg);
const result = listSessionsFromStore({
cfg,
storePath,
@@ -98,11 +98,18 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
const cfg = loadConfig();
const storePath = resolveStorePath(cfg.session?.store);
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const now = Date.now();
const existing = store[key];
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const existing = store[primaryKey];
const next: SessionEntry = existing
? {
...existing,
@@ -134,7 +141,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
);
return;
}
if (!key.startsWith("subagent:")) {
if (!isSubagentSessionKey(primaryKey)) {
respond(
false,
undefined,
@@ -311,12 +318,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
}
store[key] = next;
store[primaryKey] = next;
await saveSessionStore(storePath, store);
const result: SessionsPatchResult = {
ok: true,
path: storePath,
key,
key: target.canonicalKey,
entry: next,
};
respond(true, result, undefined);
@@ -344,7 +351,17 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const { storePath, store, entry } = loadSessionEntry(key);
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
const now = Date.now();
const next: SessionEntry = {
sessionId: randomUUID(),
@@ -356,13 +373,17 @@ export const sessionsHandlers: GatewayRequestHandlers = {
model: entry?.model,
contextTokens: entry?.contextTokens,
sendPolicy: entry?.sendPolicy,
lastChannel: entry?.lastChannel,
lastProvider: entry?.lastProvider,
lastTo: entry?.lastTo,
skillsSnapshot: entry?.skillsSnapshot,
};
store[key] = next;
store[primaryKey] = next;
await saveSessionStore(storePath, store);
respond(true, { ok: true, key, entry: next }, undefined);
respond(
true,
{ ok: true, key: target.canonicalKey, entry: next },
undefined,
);
},
"sessions.delete": async ({ params, respond }) => {
if (!validateSessionsDeleteParams(params)) {
@@ -387,8 +408,10 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const mainKey = resolveMainSessionKey(loadConfig());
if (key === mainKey) {
const cfg = loadConfig();
const mainKey = resolveMainSessionKey(cfg);
const target = resolveGatewaySessionStoreTarget({ cfg, key });
if (target.canonicalKey === mainKey) {
respond(
false,
undefined,
@@ -403,10 +426,18 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const deleteTranscript =
typeof p.deleteTranscript === "boolean" ? p.deleteTranscript : true;
const { storePath, store, entry } = loadSessionEntry(key);
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
const sessionId = entry?.sessionId;
const existed = Boolean(store[key]);
clearCommandLane(resolveEmbeddedSessionLane(key));
const existed = Boolean(entry);
clearCommandLane(resolveEmbeddedSessionLane(target.canonicalKey));
if (sessionId && isEmbeddedPiRunActive(sessionId)) {
abortEmbeddedPiRun(sessionId);
const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000);
@@ -422,7 +453,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
}
if (existed) delete store[key];
if (existed) delete store[primaryKey];
await saveSessionStore(storePath, store);
const archived: string[] = [];
@@ -430,6 +461,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
for (const candidate of resolveSessionTranscriptCandidates(
sessionId,
storePath,
target.agentId,
)) {
if (!fs.existsSync(candidate)) continue;
try {
@@ -440,7 +472,11 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
}
respond(true, { ok: true, key, deleted: existed, archived }, undefined);
respond(
true,
{ ok: true, key: target.canonicalKey, deleted: existed, archived },
undefined,
);
},
"sessions.compact": async ({ params, respond }) => {
if (!validateSessionsCompactParams(params)) {
@@ -470,12 +506,27 @@ export const sessionsHandlers: GatewayRequestHandlers = {
? Math.max(1, Math.floor(p.maxLines))
: 400;
const { storePath, store, entry } = loadSessionEntry(key);
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const storePath = target.storePath;
const store = loadSessionStore(storePath);
const primaryKey = target.storeKeys[0] ?? key;
const existingKey = target.storeKeys.find((candidate) => store[candidate]);
if (existingKey && existingKey !== primaryKey && !store[primaryKey]) {
store[primaryKey] = store[existingKey];
delete store[existingKey];
}
const entry = store[primaryKey];
const sessionId = entry?.sessionId;
if (!sessionId) {
respond(
true,
{ ok: true, key, compacted: false, reason: "no sessionId" },
{
ok: true,
key: target.canonicalKey,
compacted: false,
reason: "no sessionId",
},
undefined,
);
return;
@@ -484,11 +535,17 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const filePath = resolveSessionTranscriptCandidates(
sessionId,
storePath,
target.agentId,
).find((candidate) => fs.existsSync(candidate));
if (!filePath) {
respond(
true,
{ ok: true, key, compacted: false, reason: "no transcript" },
{
ok: true,
key: target.canonicalKey,
compacted: false,
reason: "no transcript",
},
undefined,
);
return;
@@ -499,7 +556,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
if (lines.length <= maxLines) {
respond(
true,
{ ok: true, key, compacted: false, kept: lines.length },
{
ok: true,
key: target.canonicalKey,
compacted: false,
kept: lines.length,
},
undefined,
);
return;
@@ -509,11 +571,11 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const keptLines = lines.slice(-maxLines);
fs.writeFileSync(filePath, `${keptLines.join("\n")}\n`, "utf-8");
if (store[key]) {
delete store[key].inputTokens;
delete store[key].outputTokens;
delete store[key].totalTokens;
store[key].updatedAt = Date.now();
if (store[primaryKey]) {
delete store[primaryKey].inputTokens;
delete store[primaryKey].outputTokens;
delete store[primaryKey].totalTokens;
store[primaryKey].updatedAt = Date.now();
await saveSessionStore(storePath, store);
}
@@ -521,7 +583,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
true,
{
ok: true,
key,
key: target.canonicalKey,
compacted: true,
archived,
kept: keptLines.length,

View File

@@ -69,10 +69,10 @@ export type GatewayRequestContext = {
findRunningWizard: () => string | null;
purgeWizardSession: (id: string) => void;
getRuntimeSnapshot: () => ProviderRuntimeSnapshot;
startWhatsAppProvider: () => Promise<void>;
stopWhatsAppProvider: () => Promise<void>;
startWhatsAppProvider: (accountId?: string) => Promise<void>;
stopWhatsAppProvider: (accountId?: string) => Promise<void>;
stopTelegramProvider: () => Promise<void>;
markWhatsAppLoggedOut: (cleared: boolean) => void;
markWhatsAppLoggedOut: (cleared: boolean, accountId?: string) => void;
wizardRunner: (
opts: import("../../commands/onboard-types.js").OnboardOptions,
runtime: import("../../runtime.js").RuntimeEnv,

View File

@@ -1,4 +1,6 @@
import { loadConfig } from "../../config/config.js";
import { defaultRuntime } from "../../runtime.js";
import { resolveWhatsAppAccount } from "../../web/accounts.js";
import { startWebLoginWithQr, waitForWebLogin } from "../../web/login-qr.js";
import { logoutWeb } from "../../web/session.js";
import {
@@ -25,7 +27,11 @@ export const webHandlers: GatewayRequestHandlers = {
return;
}
try {
await context.stopWhatsAppProvider();
const accountId =
typeof (params as { accountId?: unknown }).accountId === "string"
? (params as { accountId?: string }).accountId
: undefined;
await context.stopWhatsAppProvider(accountId);
const result = await startWebLoginWithQr({
force: Boolean((params as { force?: boolean }).force),
timeoutMs:
@@ -33,6 +39,7 @@ export const webHandlers: GatewayRequestHandlers = {
? (params as { timeoutMs?: number }).timeoutMs
: undefined,
verbose: Boolean((params as { verbose?: boolean }).verbose),
accountId,
});
respond(true, result, undefined);
} catch (err) {
@@ -56,14 +63,19 @@ export const webHandlers: GatewayRequestHandlers = {
return;
}
try {
const accountId =
typeof (params as { accountId?: unknown }).accountId === "string"
? (params as { accountId?: string }).accountId
: undefined;
const result = await waitForWebLogin({
timeoutMs:
typeof (params as { timeoutMs?: unknown }).timeoutMs === "number"
? (params as { timeoutMs?: number }).timeoutMs
: undefined,
accountId,
});
if (result.connected) {
await context.startWhatsAppProvider();
await context.startWhatsAppProvider(accountId);
}
respond(true, result, undefined);
} catch (err) {
@@ -74,11 +86,26 @@ export const webHandlers: GatewayRequestHandlers = {
);
}
},
"web.logout": async ({ respond, context }) => {
"web.logout": async ({ params, respond, context }) => {
try {
await context.stopWhatsAppProvider();
const cleared = await logoutWeb(defaultRuntime);
context.markWhatsAppLoggedOut(cleared);
const rawAccountId =
params && typeof params === "object" && "accountId" in params
? (params as { accountId?: unknown }).accountId
: undefined;
const accountId =
typeof rawAccountId === "string" ? rawAccountId.trim() : "";
const cfg = loadConfig();
const account = resolveWhatsAppAccount({
cfg,
accountId: accountId || undefined,
});
await context.stopWhatsAppProvider(account.accountId);
const cleared = await logoutWeb({
authDir: account.authDir,
isLegacyAuthDir: account.isLegacyAuthDir,
runtime: defaultRuntime,
});
context.markWhatsAppLoggedOut(cleared, account.accountId);
respond(true, { cleared }, undefined);
} catch (err) {
respond(

View File

@@ -15,6 +15,10 @@ import {
import { monitorTelegramProvider } from "../telegram/monitor.js";
import { probeTelegram } from "../telegram/probe.js";
import { resolveTelegramToken } from "../telegram/token.js";
import {
listEnabledWhatsAppAccounts,
resolveDefaultWhatsAppAccountId,
} from "../web/accounts.js";
import type { WebProviderStatus } from "../web/auto-reply.js";
import { readWebSelfId } from "../web/session.js";
import { formatError } from "./server-utils.js";
@@ -60,6 +64,7 @@ export type IMessageRuntimeStatus = {
export type ProviderRuntimeSnapshot = {
whatsapp: WebProviderStatus;
whatsappAccounts?: Record<string, WebProviderStatus>;
telegram: TelegramRuntimeStatus;
discord: DiscordRuntimeStatus;
slack: SlackRuntimeStatus;
@@ -88,8 +93,8 @@ type ProviderManagerOptions = {
export type ProviderManager = {
getRuntimeSnapshot: () => ProviderRuntimeSnapshot;
startProviders: () => Promise<void>;
startWhatsAppProvider: () => Promise<void>;
stopWhatsAppProvider: () => Promise<void>;
startWhatsAppProvider: (accountId?: string) => Promise<void>;
stopWhatsAppProvider: (accountId?: string) => Promise<void>;
startTelegramProvider: () => Promise<void>;
stopTelegramProvider: () => Promise<void>;
startDiscordProvider: () => Promise<void>;
@@ -100,7 +105,7 @@ export type ProviderManager = {
stopSignalProvider: () => Promise<void>;
startIMessageProvider: () => Promise<void>;
stopIMessageProvider: () => Promise<void>;
markWhatsAppLoggedOut: (cleared: boolean) => void;
markWhatsAppLoggedOut: (cleared: boolean, accountId?: string) => void;
};
export function createProviderManager(
@@ -122,20 +127,21 @@ export function createProviderManager(
imessageRuntimeEnv,
} = opts;
let whatsappAbort: AbortController | null = null;
const whatsappAborts = new Map<string, AbortController>();
let telegramAbort: AbortController | null = null;
let discordAbort: AbortController | null = null;
let slackAbort: AbortController | null = null;
let signalAbort: AbortController | null = null;
let imessageAbort: AbortController | null = null;
let whatsappTask: Promise<unknown> | null = null;
const whatsappTasks = new Map<string, Promise<unknown>>();
let telegramTask: Promise<unknown> | null = null;
let discordTask: Promise<unknown> | null = null;
let slackTask: Promise<unknown> | null = null;
let signalTask: Promise<unknown> | null = null;
let imessageTask: Promise<unknown> | null = null;
let whatsappRuntime: WebProviderStatus = {
const whatsappRuntimes = new Map<string, WebProviderStatus>();
const defaultWhatsAppStatus = (): WebProviderStatus => ({
running: false,
connected: false,
reconnectAttempts: 0,
@@ -144,7 +150,7 @@ export function createProviderManager(
lastMessageAt: null,
lastEventAt: null,
lastError: null,
};
});
let telegramRuntime: TelegramRuntimeStatus = {
running: false,
lastStartAt: null,
@@ -180,86 +186,134 @@ export function createProviderManager(
dbPath: null,
};
const updateWhatsAppStatus = (next: WebProviderStatus) => {
whatsappRuntime = next;
const updateWhatsAppStatus = (accountId: string, next: WebProviderStatus) => {
whatsappRuntimes.set(accountId, next);
};
const startWhatsAppProvider = async () => {
if (whatsappTask) return;
const startWhatsAppProvider = async (accountId?: string) => {
const cfg = loadConfig();
const enabledAccounts = listEnabledWhatsAppAccounts(cfg);
const targets = accountId
? enabledAccounts.filter((a) => a.accountId === accountId)
: enabledAccounts;
if (targets.length === 0) return;
if (cfg.web?.enabled === false) {
whatsappRuntime = {
...whatsappRuntime,
running: false,
connected: false,
lastError: "disabled",
};
for (const account of targets) {
const current =
whatsappRuntimes.get(account.accountId) ?? defaultWhatsAppStatus();
whatsappRuntimes.set(account.accountId, {
...current,
running: false,
connected: false,
lastError: "disabled",
});
}
logWhatsApp.info("skipping provider start (web.enabled=false)");
return;
}
if (!(await webAuthExists())) {
whatsappRuntime = {
...whatsappRuntime,
running: false,
connected: false,
lastError: "not linked",
};
logWhatsApp.info("skipping provider start (no linked session)");
return;
}
const { e164, jid } = readWebSelfId();
const identity = e164 ? e164 : jid ? `jid ${jid}` : "unknown";
logWhatsApp.info(`starting provider (${identity})`);
whatsappAbort = new AbortController();
whatsappRuntime = {
...whatsappRuntime,
running: true,
connected: false,
lastError: null,
};
const task = monitorWebProvider(
shouldLogVerbose(),
undefined,
true,
undefined,
whatsappRuntimeEnv,
whatsappAbort.signal,
{ statusSink: updateWhatsAppStatus },
)
.catch((err) => {
whatsappRuntime = {
...whatsappRuntime,
lastError: formatError(err),
};
logWhatsApp.error(`provider exited: ${formatError(err)}`);
})
.finally(() => {
whatsappAbort = null;
whatsappTask = null;
whatsappRuntime = {
...whatsappRuntime,
running: false,
await Promise.all(
targets.map(async (account) => {
if (whatsappTasks.has(account.accountId)) return;
const current =
whatsappRuntimes.get(account.accountId) ?? defaultWhatsAppStatus();
if (!(await webAuthExists(account.authDir))) {
whatsappRuntimes.set(account.accountId, {
...current,
running: false,
connected: false,
lastError: "not linked",
});
logWhatsApp.info(
`[${account.accountId}] skipping provider start (no linked session)`,
);
return;
}
const { e164, jid } = readWebSelfId(account.authDir);
const identity = e164 ? e164 : jid ? `jid ${jid}` : "unknown";
logWhatsApp.info(
`[${account.accountId}] starting provider (${identity})`,
);
const abort = new AbortController();
whatsappAborts.set(account.accountId, abort);
whatsappRuntimes.set(account.accountId, {
...current,
running: true,
connected: false,
};
});
whatsappTask = task;
lastError: null,
});
const task = monitorWebProvider(
shouldLogVerbose(),
undefined,
true,
undefined,
whatsappRuntimeEnv,
abort.signal,
{
statusSink: (next) => updateWhatsAppStatus(account.accountId, next),
accountId: account.accountId,
},
)
.catch((err) => {
const latest =
whatsappRuntimes.get(account.accountId) ??
defaultWhatsAppStatus();
whatsappRuntimes.set(account.accountId, {
...latest,
lastError: formatError(err),
});
logWhatsApp.error(
`[${account.accountId}] provider exited: ${formatError(err)}`,
);
})
.finally(() => {
whatsappAborts.delete(account.accountId);
whatsappTasks.delete(account.accountId);
const latest =
whatsappRuntimes.get(account.accountId) ??
defaultWhatsAppStatus();
whatsappRuntimes.set(account.accountId, {
...latest,
running: false,
connected: false,
});
});
whatsappTasks.set(account.accountId, task);
}),
);
};
const stopWhatsAppProvider = async () => {
if (!whatsappAbort && !whatsappTask) return;
whatsappAbort?.abort();
try {
await whatsappTask;
} catch {
// ignore
}
whatsappAbort = null;
whatsappTask = null;
whatsappRuntime = {
...whatsappRuntime,
running: false,
connected: false,
};
const stopWhatsAppProvider = async (accountId?: string) => {
const ids = accountId
? [accountId]
: Array.from(
new Set([...whatsappAborts.keys(), ...whatsappTasks.keys()]),
);
await Promise.all(
ids.map(async (id) => {
const abort = whatsappAborts.get(id);
const task = whatsappTasks.get(id);
if (!abort && !task) return;
abort?.abort();
try {
await task;
} catch {
// ignore
}
whatsappAborts.delete(id);
whatsappTasks.delete(id);
const latest = whatsappRuntimes.get(id) ?? defaultWhatsAppStatus();
whatsappRuntimes.set(id, {
...latest,
running: false,
connected: false,
});
}),
);
};
const startTelegramProvider = async () => {
@@ -754,23 +808,38 @@ export function createProviderManager(
await startIMessageProvider();
};
const markWhatsAppLoggedOut = (cleared: boolean) => {
whatsappRuntime = {
...whatsappRuntime,
const markWhatsAppLoggedOut = (cleared: boolean, accountId?: string) => {
const cfg = loadConfig();
const resolvedId = accountId ?? resolveDefaultWhatsAppAccountId(cfg);
const current = whatsappRuntimes.get(resolvedId) ?? defaultWhatsAppStatus();
whatsappRuntimes.set(resolvedId, {
...current,
running: false,
connected: false,
lastError: cleared ? "logged out" : whatsappRuntime.lastError,
};
lastError: cleared ? "logged out" : current.lastError,
});
};
const getRuntimeSnapshot = (): ProviderRuntimeSnapshot => ({
whatsapp: { ...whatsappRuntime },
telegram: { ...telegramRuntime },
discord: { ...discordRuntime },
slack: { ...slackRuntime },
signal: { ...signalRuntime },
imessage: { ...imessageRuntime },
});
const getRuntimeSnapshot = (): ProviderRuntimeSnapshot => {
const cfg = loadConfig();
const defaultId = resolveDefaultWhatsAppAccountId(cfg);
const whatsapp = whatsappRuntimes.get(defaultId) ?? defaultWhatsAppStatus();
const whatsappAccounts = Object.fromEntries(
Array.from(whatsappRuntimes.entries()).map(([id, status]) => [
id,
{ ...status },
]),
);
return {
whatsapp: { ...whatsapp },
whatsappAccounts,
telegram: { ...telegramRuntime },
discord: { ...discordRuntime },
slack: { ...slackRuntime },
signal: { ...signalRuntime },
imessage: { ...imessageRuntime },
};
};
return {
getRuntimeSnapshot,

View File

@@ -33,7 +33,7 @@ describe("gateway server agent", () => {
main: {
sessionId: "sess-main-stale",
updatedAt: Date.now(),
lastChannel: "whatsapp",
lastProvider: "whatsapp",
lastTo: "+1555",
},
},
@@ -49,7 +49,7 @@ describe("gateway server agent", () => {
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
channel: "last",
provider: "last",
deliver: true,
idempotencyKey: "idem-agent-last-stale",
});
@@ -76,7 +76,7 @@ describe("gateway server agent", () => {
main: {
sessionId: "sess-main-whatsapp",
updatedAt: Date.now(),
lastChannel: "whatsapp",
lastProvider: "whatsapp",
lastTo: "+1555",
},
},
@@ -92,7 +92,7 @@ describe("gateway server agent", () => {
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
channel: "last",
provider: "last",
deliver: true,
idempotencyKey: "idem-agent-last-whatsapp",
});
@@ -120,7 +120,7 @@ describe("gateway server agent", () => {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
lastChannel: "telegram",
lastProvider: "telegram",
lastTo: "123",
},
},
@@ -136,7 +136,7 @@ describe("gateway server agent", () => {
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
channel: "last",
provider: "last",
deliver: true,
idempotencyKey: "idem-agent-last",
});
@@ -164,7 +164,7 @@ describe("gateway server agent", () => {
main: {
sessionId: "sess-discord",
updatedAt: Date.now(),
lastChannel: "discord",
lastProvider: "discord",
lastTo: "channel:discord-123",
},
},
@@ -180,7 +180,7 @@ describe("gateway server agent", () => {
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
channel: "last",
provider: "last",
deliver: true,
idempotencyKey: "idem-agent-last-discord",
});
@@ -208,7 +208,7 @@ describe("gateway server agent", () => {
main: {
sessionId: "sess-signal",
updatedAt: Date.now(),
lastChannel: "signal",
lastProvider: "signal",
lastTo: "+15551234567",
},
},
@@ -224,7 +224,7 @@ describe("gateway server agent", () => {
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
channel: "last",
provider: "last",
deliver: true,
idempotencyKey: "idem-agent-last-signal",
});
@@ -253,7 +253,7 @@ describe("gateway server agent", () => {
main: {
sessionId: "sess-main-webchat",
updatedAt: Date.now(),
lastChannel: "webchat",
lastProvider: "webchat",
lastTo: "+1555",
},
},
@@ -269,7 +269,7 @@ describe("gateway server agent", () => {
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
channel: "last",
provider: "last",
deliver: true,
idempotencyKey: "idem-agent-webchat",
});

View File

@@ -70,7 +70,7 @@ describe("gateway server chat", () => {
rules: [
{
action: "deny",
match: { surface: "discord", chatType: "group" },
match: { provider: "discord", chatType: "group" },
},
],
},
@@ -84,7 +84,7 @@ describe("gateway server chat", () => {
sessionId: "sess-discord",
updatedAt: Date.now(),
chatType: "group",
surface: "discord",
provider: "discord",
},
},
null,
@@ -423,7 +423,7 @@ describe("gateway server chat", () => {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
lastChannel: "whatsapp",
lastProvider: "whatsapp",
lastTo: "+1555",
},
},
@@ -446,9 +446,9 @@ describe("gateway server chat", () => {
const stored = JSON.parse(
await fs.readFile(testState.sessionStorePath, "utf-8"),
) as {
main?: { lastChannel?: string; lastTo?: string };
main?: { lastProvider?: string; lastTo?: string };
};
expect(stored.main?.lastChannel).toBe("whatsapp");
expect(stored.main?.lastProvider).toBe("whatsapp");
expect(stored.main?.lastTo).toBe("+1555");
ws.close();

View File

@@ -86,7 +86,7 @@ describe("gateway server hooks", () => {
await server.close();
});
test("hooks agent rejects invalid channel", async () => {
test("hooks agent rejects invalid provider", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
@@ -96,7 +96,7 @@ describe("gateway server hooks", () => {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ message: "Nope", channel: "sms" }),
body: JSON.stringify({ message: "Nope", provider: "sms" }),
});
expect(res.status).toBe(400);
expect(peekSystemEvents().length).toBe(0);

View File

@@ -732,7 +732,7 @@ describe("gateway server node/bridge", () => {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
lastChannel: "whatsapp",
lastProvider: "whatsapp",
lastTo: "+1555",
},
},
@@ -759,7 +759,7 @@ describe("gateway server node/bridge", () => {
const call = spy.mock.calls.at(-1)?.[0] as Record<string, unknown>;
expect(call.sessionId).toBe("sess-main");
expect(call.deliver).toBe(false);
expect(call.surface).toBe("Node");
expect(call.messageProvider).toBe("node");
const stored = JSON.parse(
await fs.readFile(testState.sessionStorePath, "utf-8"),

View File

@@ -40,23 +40,26 @@ describe("gateway server sessions", () => {
storePath,
JSON.stringify(
{
main: {
"agent:main:main": {
sessionId: "sess-main",
updatedAt: now - 30_000,
inputTokens: 10,
outputTokens: 20,
thinkingLevel: "low",
verboseLevel: "on",
lastProvider: "whatsapp",
lastTo: "+1555",
lastAccountId: "work",
},
"discord:group:dev": {
"agent:main:discord:group:dev": {
sessionId: "sess-group",
updatedAt: now - 120_000,
totalTokens: 50,
},
"subagent:one": {
"agent:main:subagent:one": {
sessionId: "sess-subagent",
updatedAt: now - 120_000,
spawnedBy: "main",
spawnedBy: "agent:main:main",
},
global: {
sessionId: "sess-global",
@@ -91,16 +94,20 @@ describe("gateway server sessions", () => {
totalTokens?: number;
thinkingLevel?: string;
verboseLevel?: string;
lastAccountId?: string;
}>;
}>(ws, "sessions.list", { includeGlobal: false, includeUnknown: false });
expect(list1.ok).toBe(true);
expect(list1.payload?.path).toBe(storePath);
expect(list1.payload?.sessions.some((s) => s.key === "global")).toBe(false);
const main = list1.payload?.sessions.find((s) => s.key === "main");
const main = list1.payload?.sessions.find(
(s) => s.key === "agent:main:main",
);
expect(main?.totalTokens).toBe(30);
expect(main?.thinkingLevel).toBe("low");
expect(main?.verboseLevel).toBe("on");
expect(main?.lastAccountId).toBe("work");
const active = await rpcReq<{
sessions: Array<{ key: string }>;
@@ -110,7 +117,9 @@ describe("gateway server sessions", () => {
activeMinutes: 1,
});
expect(active.ok).toBe(true);
expect(active.payload?.sessions.map((s) => s.key)).toEqual(["main"]);
expect(active.payload?.sessions.map((s) => s.key)).toEqual([
"agent:main:main",
]);
const limited = await rpcReq<{
sessions: Array<{ key: string }>;
@@ -126,16 +135,16 @@ describe("gateway server sessions", () => {
const patched = await rpcReq<{ ok: true; key: string }>(
ws,
"sessions.patch",
{ key: "main", thinkingLevel: "medium", verboseLevel: null },
{ key: "agent:main:main", thinkingLevel: "medium", verboseLevel: null },
);
expect(patched.ok).toBe(true);
expect(patched.payload?.ok).toBe(true);
expect(patched.payload?.key).toBe("main");
expect(patched.payload?.key).toBe("agent:main:main");
const sendPolicyPatched = await rpcReq<{
ok: true;
entry: { sendPolicy?: string };
}>(ws, "sessions.patch", { key: "main", sendPolicy: "deny" });
}>(ws, "sessions.patch", { key: "agent:main:main", sendPolicy: "deny" });
expect(sendPolicyPatched.ok).toBe(true);
expect(sendPolicyPatched.payload?.entry.sendPolicy).toBe("deny");
@@ -148,7 +157,9 @@ describe("gateway server sessions", () => {
}>;
}>(ws, "sessions.list", {});
expect(list2.ok).toBe(true);
const main2 = list2.payload?.sessions.find((s) => s.key === "main");
const main2 = list2.payload?.sessions.find(
(s) => s.key === "agent:main:main",
);
expect(main2?.thinkingLevel).toBe("medium");
expect(main2?.verboseLevel).toBeUndefined();
expect(main2?.sendPolicy).toBe("deny");
@@ -158,23 +169,26 @@ describe("gateway server sessions", () => {
}>(ws, "sessions.list", {
includeGlobal: true,
includeUnknown: true,
spawnedBy: "main",
spawnedBy: "agent:main:main",
});
expect(spawnedOnly.ok).toBe(true);
expect(spawnedOnly.payload?.sessions.map((s) => s.key)).toEqual([
"subagent:one",
"agent:main:subagent:one",
]);
const spawnedPatched = await rpcReq<{
ok: true;
entry: { spawnedBy?: string };
}>(ws, "sessions.patch", { key: "subagent:two", spawnedBy: "main" });
}>(ws, "sessions.patch", {
key: "agent:main:subagent:two",
spawnedBy: "agent:main:main",
});
expect(spawnedPatched.ok).toBe(true);
expect(spawnedPatched.payload?.entry.spawnedBy).toBe("main");
expect(spawnedPatched.payload?.entry.spawnedBy).toBe("agent:main:main");
const spawnedPatchedInvalidKey = await rpcReq(ws, "sessions.patch", {
key: "main",
spawnedBy: "main",
key: "agent:main:main",
spawnedBy: "agent:main:main",
});
expect(spawnedPatchedInvalidKey.ok).toBe(false);
@@ -183,7 +197,10 @@ describe("gateway server sessions", () => {
const modelPatched = await rpcReq<{
ok: true;
entry: { modelOverride?: string; providerOverride?: string };
}>(ws, "sessions.patch", { key: "main", model: "openai/gpt-test-a" });
}>(ws, "sessions.patch", {
key: "agent:main:main",
model: "openai/gpt-test-a",
});
expect(modelPatched.ok).toBe(true);
expect(modelPatched.payload?.entry.modelOverride).toBe("gpt-test-a");
expect(modelPatched.payload?.entry.providerOverride).toBe("openai");
@@ -191,7 +208,7 @@ describe("gateway server sessions", () => {
const compacted = await rpcReq<{ ok: true; compacted: boolean }>(
ws,
"sessions.compact",
{ key: "main", maxLines: 3 },
{ key: "agent:main:main", maxLines: 3 },
);
expect(compacted.ok).toBe(true);
expect(compacted.payload?.compacted).toBe(true);
@@ -209,7 +226,7 @@ describe("gateway server sessions", () => {
const deleted = await rpcReq<{ ok: true; deleted: boolean }>(
ws,
"sessions.delete",
{ key: "discord:group:dev" },
{ key: "agent:main:discord:group:dev" },
);
expect(deleted.ok).toBe(true);
expect(deleted.payload?.deleted).toBe(true);
@@ -219,7 +236,7 @@ describe("gateway server sessions", () => {
expect(listAfterDelete.ok).toBe(true);
expect(
listAfterDelete.payload?.sessions.some(
(s) => s.key === "discord:group:dev",
(s) => s.key === "agent:main:discord:group:dev",
),
).toBe(false);
const filesAfterDelete = await fs.readdir(dir);
@@ -231,13 +248,13 @@ describe("gateway server sessions", () => {
ok: true;
key: string;
entry: { sessionId: string };
}>(ws, "sessions.reset", { key: "main" });
}>(ws, "sessions.reset", { key: "agent:main:main" });
expect(reset.ok).toBe(true);
expect(reset.payload?.key).toBe("main");
expect(reset.payload?.key).toBe("agent:main:main");
expect(reset.payload?.entry.sessionId).not.toBe("sess-main");
const badThinking = await rpcReq(ws, "sessions.patch", {
key: "main",
key: "agent:main:main",
thinkingLevel: "banana",
});
expect(badThinking.ok).toBe(false);

View File

@@ -482,7 +482,7 @@ export async function startGatewayServer(
wakeMode: "now" | "next-heartbeat";
sessionKey: string;
deliver: boolean;
channel:
provider:
| "last"
| "whatsapp"
| "telegram"
@@ -514,7 +514,7 @@ export async function startGatewayServer(
thinking: value.thinking,
timeoutSeconds: value.timeoutSeconds,
deliver: value.deliver,
channel: value.channel,
provider: value.provider,
to: value.to,
},
state: { nextRunAtMs: now },

View File

@@ -15,7 +15,7 @@ describe("gateway session utils", () => {
test("parseGroupKey handles group prefixes", () => {
expect(parseGroupKey("group:abc")).toEqual({ id: "abc" });
expect(parseGroupKey("discord:group:dev")).toEqual({
surface: "discord",
provider: "discord",
kind: "group",
id: "dev",
});

View File

@@ -9,12 +9,19 @@ import {
} from "../agents/defaults.js";
import { resolveConfiguredModelRef } from "../agents/model-selection.js";
import { type ClawdbotConfig, loadConfig } from "../config/config.js";
import { resolveStateDir } from "../config/paths.js";
import {
buildGroupDisplayName,
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveSessionTranscriptPath,
resolveStorePath,
type SessionEntry,
} from "../config/sessions.js";
import {
normalizeAgentId,
parseAgentSessionKey,
} from "../routing/session-key.js";
export type GatewaySessionsDefaults = {
model: string | null;
@@ -25,7 +32,7 @@ export type GatewaySessionRow = {
key: string;
kind: "direct" | "group" | "global" | "unknown";
displayName?: string;
surface?: string;
provider?: string;
subject?: string;
room?: string;
space?: string;
@@ -43,8 +50,9 @@ export type GatewaySessionRow = {
totalTokens?: number;
model?: string;
contextTokens?: number;
lastChannel?: SessionEntry["lastChannel"];
lastProvider?: SessionEntry["lastProvider"];
lastTo?: string;
lastAccountId?: string;
};
export type SessionsListResult = {
@@ -90,12 +98,16 @@ export function readSessionMessages(
export function resolveSessionTranscriptCandidates(
sessionId: string,
storePath: string | undefined,
agentId?: string,
): string[] {
const candidates: string[] = [];
if (storePath) {
const dir = path.dirname(storePath);
candidates.push(path.join(dir, `${sessionId}.jsonl`));
}
if (agentId) {
candidates.push(resolveSessionTranscriptPath(sessionId, agentId));
}
candidates.push(
path.join(os.homedir(), ".clawdbot", "sessions", `${sessionId}.jsonl`),
);
@@ -136,11 +148,12 @@ export function capArrayByJsonBytes<T>(
export function loadSessionEntry(sessionKey: string) {
const cfg = loadConfig();
const sessionCfg = cfg.session;
const storePath = sessionCfg?.store
? resolveStorePath(sessionCfg.store)
: resolveStorePath(undefined);
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const storePath = resolveStorePath(sessionCfg?.store, { agentId });
const store = loadSessionStore(storePath);
const entry = store[sessionKey];
const parsed = parseAgentSessionKey(sessionKey);
const legacyKey = parsed?.rest;
const entry = store[sessionKey] ?? (legacyKey ? store[legacyKey] : undefined);
return { cfg, storePath, store, entry };
}
@@ -163,22 +176,167 @@ export function classifySessionKey(
export function parseGroupKey(
key: string,
): { surface?: string; kind?: "group" | "channel"; id?: string } | null {
if (key.startsWith("group:")) {
const raw = key.slice("group:".length);
): { provider?: string; kind?: "group" | "channel"; id?: string } | null {
const agentParsed = parseAgentSessionKey(key);
const rawKey = agentParsed?.rest ?? key;
if (rawKey.startsWith("group:")) {
const raw = rawKey.slice("group:".length);
return raw ? { id: raw } : null;
}
const parts = key.split(":").filter(Boolean);
const parts = rawKey.split(":").filter(Boolean);
if (parts.length >= 3) {
const [surface, kind, ...rest] = parts;
const [provider, kind, ...rest] = parts;
if (kind === "group" || kind === "channel") {
const id = rest.join(":");
return { surface, kind, id };
return { provider, kind, id };
}
}
return null;
}
function isStorePathTemplate(store?: string): boolean {
return typeof store === "string" && store.includes("{agentId}");
}
function listExistingAgentIdsFromDisk(): string[] {
const root = resolveStateDir();
const agentsDir = path.join(root, "agents");
try {
const entries = fs.readdirSync(agentsDir, { withFileTypes: true });
return entries
.filter((entry) => entry.isDirectory())
.map((entry) => normalizeAgentId(entry.name))
.filter(Boolean);
} catch {
return [];
}
}
function listConfiguredAgentIds(cfg: ClawdbotConfig): string[] {
const ids = new Set<string>();
const defaultId = normalizeAgentId(cfg.routing?.defaultAgentId);
ids.add(defaultId);
const agents = cfg.routing?.agents;
if (agents && typeof agents === "object") {
for (const id of Object.keys(agents)) ids.add(normalizeAgentId(id));
}
for (const id of listExistingAgentIdsFromDisk()) ids.add(id);
const sorted = Array.from(ids).filter(Boolean);
sorted.sort((a, b) => a.localeCompare(b));
if (sorted.includes(defaultId)) {
return [defaultId, ...sorted.filter((id) => id !== defaultId)];
}
return sorted;
}
function canonicalizeSessionKeyForAgent(agentId: string, key: string): string {
if (key === "global" || key === "unknown") return key;
if (key.startsWith("agent:")) return key;
return `agent:${normalizeAgentId(agentId)}:${key}`;
}
function canonicalizeSpawnedByForAgent(
agentId: string,
spawnedBy?: string,
): string | undefined {
const raw = spawnedBy?.trim();
if (!raw) return undefined;
if (raw === "global" || raw === "unknown") return raw;
if (raw.startsWith("agent:")) return raw;
return `agent:${normalizeAgentId(agentId)}:${raw}`;
}
export function resolveGatewaySessionStoreTarget(params: {
cfg: ClawdbotConfig;
key: string;
}): {
agentId: string;
storePath: string;
canonicalKey: string;
storeKeys: string[];
} {
const key = params.key.trim();
const agentId = resolveAgentIdFromSessionKey(key);
const storeConfig = params.cfg.session?.store;
const storePath = resolveStorePath(storeConfig, { agentId });
if (key === "global" || key === "unknown") {
return { agentId, storePath, canonicalKey: key, storeKeys: [key] };
}
const parsed = parseAgentSessionKey(key);
if (parsed) {
return {
agentId,
storePath,
canonicalKey: key,
storeKeys: [key, parsed.rest],
};
}
if (key.startsWith("subagent:")) {
const canonical = canonicalizeSessionKeyForAgent(agentId, key);
return {
agentId,
storePath,
canonicalKey: canonical,
storeKeys: [canonical, key],
};
}
const canonical = canonicalizeSessionKeyForAgent(agentId, key);
return {
agentId,
storePath,
canonicalKey: canonical,
storeKeys: [canonical, key],
};
}
export function loadCombinedSessionStoreForGateway(cfg: ClawdbotConfig): {
storePath: string;
store: Record<string, SessionEntry>;
} {
const storeConfig = cfg.session?.store;
if (storeConfig && !isStorePathTemplate(storeConfig)) {
const storePath = resolveStorePath(storeConfig);
const defaultAgentId = normalizeAgentId(cfg.routing?.defaultAgentId);
const store = loadSessionStore(storePath);
const combined: Record<string, SessionEntry> = {};
for (const [key, entry] of Object.entries(store)) {
const canonicalKey = canonicalizeSessionKeyForAgent(defaultAgentId, key);
combined[canonicalKey] = {
...entry,
spawnedBy: canonicalizeSpawnedByForAgent(
defaultAgentId,
entry.spawnedBy,
),
};
}
return { storePath, store: combined };
}
const agentIds = listConfiguredAgentIds(cfg);
const combined: Record<string, SessionEntry> = {};
for (const agentId of agentIds) {
const storePath = resolveStorePath(storeConfig, { agentId });
const store = loadSessionStore(storePath);
for (const [key, entry] of Object.entries(store)) {
const canonicalKey = canonicalizeSessionKeyForAgent(agentId, key);
combined[canonicalKey] = {
...entry,
spawnedBy: canonicalizeSpawnedByForAgent(agentId, entry.spawnedBy),
};
}
}
const storePath =
typeof storeConfig === "string" && storeConfig.trim()
? storeConfig.trim()
: "(multiple)";
return { storePath, store: combined };
}
export function getSessionDefaults(
cfg: ClawdbotConfig,
): GatewaySessionsDefaults {
@@ -251,16 +409,16 @@ export function listSessionsFromStore(params: {
const output = entry?.outputTokens ?? 0;
const total = entry?.totalTokens ?? input + output;
const parsed = parseGroupKey(key);
const surface = entry?.surface ?? parsed?.surface;
const provider = entry?.provider ?? parsed?.provider;
const subject = entry?.subject;
const room = entry?.room;
const space = entry?.space;
const id = parsed?.id;
const displayName =
entry?.displayName ??
(surface
(provider
? buildGroupDisplayName({
surface,
provider,
subject,
room,
space,
@@ -272,7 +430,7 @@ export function listSessionsFromStore(params: {
key,
kind: classifySessionKey(key, entry),
displayName,
surface,
provider,
subject,
room,
space,
@@ -290,8 +448,9 @@ export function listSessionsFromStore(params: {
totalTokens: total,
model: entry?.model,
contextTokens: entry?.contextTokens,
lastChannel: entry?.lastChannel,
lastProvider: entry?.lastProvider,
lastTo: entry?.lastTo,
lastAccountId: entry?.lastAccountId,
} satisfies GatewaySessionRow;
})
.sort((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0));