fix: use telegram token file for sends and guard console EPIPE

This commit is contained in:
Josh Palmer
2025-12-24 18:18:48 +01:00
parent 1d8b47785c
commit 10eced9971
4 changed files with 77 additions and 45 deletions

View File

@@ -120,6 +120,7 @@ let testCronEnabled: boolean | undefined = false;
let testGatewayBind: "auto" | "lan" | "tailnet" | "loopback" | undefined;
let testGatewayAuth: Record<string, unknown> | undefined;
let testHooksConfig: Record<string, unknown> | undefined;
let testCanvasHostPort: number | undefined;
const sessionStoreSaveDelayMs = vi.hoisted(() => ({ value: 0 }));
vi.mock("../config/sessions.js", async () => {
const actual = await vi.importActual<typeof import("../config/sessions.js")>(
@@ -205,6 +206,12 @@ vi.mock("../config/config.js", () => {
if (testGatewayAuth) gateway.auth = testGatewayAuth;
return Object.keys(gateway).length > 0 ? gateway : undefined;
})(),
canvasHost: (() => {
const canvasHost: Record<string, unknown> = {};
if (typeof testCanvasHostPort === "number")
canvasHost.port = testCanvasHostPort;
return Object.keys(canvasHost).length > 0 ? canvasHost : undefined;
})(),
hooks: testHooksConfig,
cron: (() => {
const cron: Record<string, unknown> = {};
@@ -261,6 +268,7 @@ beforeEach(async () => {
testGatewayBind = undefined;
testGatewayAuth = undefined;
testHooksConfig = undefined;
testCanvasHostPort = undefined;
cronIsolatedRun.mockClear();
drainSystemEvents();
__resetModelCatalogCacheForTest();
@@ -1907,6 +1915,8 @@ describe("gateway server", () => {
process.env.CLAWDIS_GATEWAY_TOKEN = "secret";
testTailnetIPv4.value = "100.64.0.1";
testGatewayBind = "lan";
const canvasPort = await getFreePort();
testCanvasHostPort = canvasPort;
const port = await getFreePort();
const server = await startGatewayServer(port, {
@@ -1919,7 +1929,7 @@ describe("gateway server", () => {
await new Promise<void>((resolve) => ws.once("open", resolve));
const hello = await connectOk(ws, { token: "secret" });
expect(hello.canvasHostUrl).toBe(`http://100.64.0.1:18793`);
expect(hello.canvasHostUrl).toBe(`http://100.64.0.1:${canvasPort}`);
ws.close();
await server.close();

View File

@@ -287,6 +287,33 @@ const whatsappRuntimeEnv = runtimeForLogger(logWhatsApp);
const telegramRuntimeEnv = runtimeForLogger(logTelegram);
const discordRuntimeEnv = runtimeForLogger(logDiscord);
function loadTelegramToken(
config: ClawdisConfig,
opts: { logMissing?: boolean } = {},
): string {
if (process.env.TELEGRAM_BOT_TOKEN) {
return process.env.TELEGRAM_BOT_TOKEN.trim();
}
if (config.telegram?.tokenFile) {
const filePath = config.telegram.tokenFile;
if (!fs.existsSync(filePath)) {
if (opts.logMissing) {
logTelegram.warn(`telegram.tokenFile not found: ${filePath}`);
}
return "";
}
try {
return fs.readFileSync(filePath, "utf-8").trim();
} catch (err) {
if (opts.logMissing) {
logTelegram.warn(`telegram.tokenFile read failed: ${String(err)}`);
}
return "";
}
}
return config.telegram?.botToken?.trim() ?? "";
}
function resolveBonjourCliPath(): string | undefined {
const envPath = process.env.CLAWDIS_CLI_PATH?.trim();
if (envPath) return envPath;
@@ -1877,30 +1904,6 @@ export async function startGatewayServer(
};
};
/**
* Load telegram token with priority: env var > tokenFile > botToken.
* tokenFile supports secret managers (e.g., agenix).
*/
const loadTelegramToken = (cfg: ClawdisConfig): string => {
if (process.env.TELEGRAM_BOT_TOKEN) {
return process.env.TELEGRAM_BOT_TOKEN.trim();
}
if (cfg.telegram?.tokenFile) {
const filePath = cfg.telegram.tokenFile;
if (!fs.existsSync(filePath)) {
logTelegram.info(`telegram tokenFile not found: ${filePath}`);
return "";
}
try {
return fs.readFileSync(filePath, "utf-8").trim();
} catch (err) {
logTelegram.info(`failed to read telegram tokenFile: ${String(err)}`);
return "";
}
}
return cfg.telegram?.botToken?.trim() ?? "";
};
const startTelegramProvider = async () => {
if (telegramTask) return;
const cfg = loadConfig();
@@ -1913,7 +1916,7 @@ export async function startGatewayServer(
logTelegram.info("skipping provider start (telegram.enabled=false)");
return;
}
const telegramToken = loadTelegramToken(cfg);
const telegramToken = loadTelegramToken(cfg, { logMissing: true });
if (!telegramToken.trim()) {
telegramRuntime = {
...telegramRuntime,
@@ -5786,9 +5789,12 @@ export async function startGatewayServer(
const provider = (params.provider ?? "whatsapp").toLowerCase();
try {
if (provider === "telegram") {
const cfg = loadConfig();
const token = loadTelegramToken(cfg);
const result = await sendMessageTelegram(to, message, {
mediaUrl: params.mediaUrl,
verbose: isVerbose(),
token: token || undefined,
});
const payload = {
runId: idem,

View File

@@ -268,6 +268,10 @@ function shouldSuppressConsoleMessage(message: string): boolean {
);
}
function isEpipeError(err: unknown): boolean {
return Boolean((err as { code?: string })?.code === "EPIPE");
}
/**
* Route console.* calls through pino while still emitting to stdout/stderr.
* This keeps user-facing output unchanged but guarantees every console call is captured in log files.
@@ -321,9 +325,19 @@ export function enableConsoleCapture(): void {
level === "error" || level === "fatal" || level === "warn"
? process.stderr
: process.stderr; // in RPC/JSON mode, keep stdout clean
target.write(`${formatted}\n`);
try {
target.write(`${formatted}\n`);
} catch (err) {
if (isEpipeError(err)) return;
throw err;
}
} else {
orig.apply(console, args as []);
try {
orig.apply(console, args as []);
} catch (err) {
if (isEpipeError(err)) return;
throw err;
}
}
};

View File

@@ -318,23 +318,25 @@ describe("web auto-reply", () => {
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = vi.fn(async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
let resolveClose: (reason: unknown) => void = () => {};
const onClose = new Promise<unknown>((res) => {
resolveClose = res;
closeResolvers.push(res);
});
return {
close: vi.fn(),
onClose,
signalClose: (reason?: unknown) => resolveClose(reason),
};
});
const listenerFactory = vi.fn(
async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
let resolveClose: (reason: unknown) => void = () => {};
const onClose = new Promise<unknown>((res) => {
resolveClose = res;
closeResolvers.push(res);
});
return {
close: vi.fn(),
onClose,
signalClose: (reason?: unknown) => resolveClose(reason),
};
},
);
const runtime = {
log: vi.fn(),
error: vi.fn(),