Gmail watcher: start when gateway (re)starts
This commit is contained in:
committed by
Peter Steinberger
parent
11c7e05f43
commit
ca9b0dbc88
@@ -174,6 +174,10 @@ import {
|
|||||||
type HookMappingResolved,
|
type HookMappingResolved,
|
||||||
resolveHookMappings,
|
resolveHookMappings,
|
||||||
} from "./hooks-mapping.js";
|
} from "./hooks-mapping.js";
|
||||||
|
import {
|
||||||
|
startGmailWatcher,
|
||||||
|
stopGmailWatcher,
|
||||||
|
} from "../hooks/gmail-watcher.js";
|
||||||
|
|
||||||
ensureClawdisCliOnPath();
|
ensureClawdisCliOnPath();
|
||||||
|
|
||||||
@@ -6859,6 +6863,20 @@ export async function startGatewayServer(
|
|||||||
logBrowser.error(`server failed to start: ${String(err)}`);
|
logBrowser.error(`server failed to start: ${String(err)}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start Gmail watcher if configured (hooks.gmail.account).
|
||||||
|
if (process.env.CLAWDIS_SKIP_GMAIL_WATCHER !== "1") {
|
||||||
|
try {
|
||||||
|
const gmailResult = await startGmailWatcher(cfgAtStart);
|
||||||
|
if (gmailResult.started) {
|
||||||
|
logHooks.info("gmail watcher started");
|
||||||
|
} else if (gmailResult.reason && gmailResult.reason !== "hooks not enabled" && gmailResult.reason !== "no gmail account configured") {
|
||||||
|
logHooks.warn(`gmail watcher not started: ${gmailResult.reason}`);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
logHooks.error(`gmail watcher failed to start: ${String(err)}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Launch configured providers (WhatsApp Web, Discord, Telegram) so gateway replies via the
|
// Launch configured providers (WhatsApp Web, Discord, Telegram) so gateway replies via the
|
||||||
// surface the message came from. Tests can opt out via CLAWDIS_SKIP_PROVIDERS.
|
// surface the message came from. Tests can opt out via CLAWDIS_SKIP_PROVIDERS.
|
||||||
if (process.env.CLAWDIS_SKIP_PROVIDERS !== "1") {
|
if (process.env.CLAWDIS_SKIP_PROVIDERS !== "1") {
|
||||||
@@ -6917,6 +6935,7 @@ export async function startGatewayServer(
|
|||||||
await stopDiscordProvider();
|
await stopDiscordProvider();
|
||||||
await stopSignalProvider();
|
await stopSignalProvider();
|
||||||
await stopIMessageProvider();
|
await stopIMessageProvider();
|
||||||
|
await stopGmailWatcher();
|
||||||
cron.stop();
|
cron.stop();
|
||||||
heartbeatRunner.stop();
|
heartbeatRunner.stop();
|
||||||
broadcast("shutdown", {
|
broadcast("shutdown", {
|
||||||
|
|||||||
215
src/hooks/gmail-watcher.ts
Normal file
215
src/hooks/gmail-watcher.ts
Normal file
@@ -0,0 +1,215 @@
|
|||||||
|
/**
|
||||||
|
* Gmail Watcher Service
|
||||||
|
*
|
||||||
|
* Automatically starts `gog gmail watch serve` when the gateway starts,
|
||||||
|
* if hooks.gmail is configured with an account.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { spawn, type ChildProcess } from "node:child_process";
|
||||||
|
import { hasBinary } from "../agents/skills.js";
|
||||||
|
import type { ClawdisConfig } from "../config/config.js";
|
||||||
|
import { createSubsystemLogger } from "../logging.js";
|
||||||
|
import { runCommandWithTimeout } from "../process/exec.js";
|
||||||
|
import {
|
||||||
|
buildGogWatchServeArgs,
|
||||||
|
buildGogWatchStartArgs,
|
||||||
|
resolveGmailHookRuntimeConfig,
|
||||||
|
type GmailHookRuntimeConfig,
|
||||||
|
} from "./gmail.js";
|
||||||
|
import { ensureTailscaleEndpoint } from "./gmail-setup-utils.js";
|
||||||
|
|
||||||
|
const log = createSubsystemLogger("gmail-watcher");
|
||||||
|
|
||||||
|
let watcherProcess: ChildProcess | null = null;
|
||||||
|
let renewInterval: ReturnType<typeof setInterval> | null = null;
|
||||||
|
let shuttingDown = false;
|
||||||
|
let currentConfig: GmailHookRuntimeConfig | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if gog binary is available
|
||||||
|
*/
|
||||||
|
function isGogAvailable(): boolean {
|
||||||
|
return hasBinary("gog");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the Gmail watch (registers with Gmail API)
|
||||||
|
*/
|
||||||
|
async function startGmailWatch(
|
||||||
|
cfg: Pick<GmailHookRuntimeConfig, "account" | "label" | "topic">,
|
||||||
|
): Promise<boolean> {
|
||||||
|
const args = ["gog", ...buildGogWatchStartArgs(cfg)];
|
||||||
|
try {
|
||||||
|
const result = await runCommandWithTimeout(args, { timeoutMs: 120_000 });
|
||||||
|
if (result.code !== 0) {
|
||||||
|
const message = result.stderr || result.stdout || "gog watch start failed";
|
||||||
|
log.error(`watch start failed: ${message}`);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
log.info(`watch started for ${cfg.account}`);
|
||||||
|
return true;
|
||||||
|
} catch (err) {
|
||||||
|
log.error(`watch start error: ${String(err)}`);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Spawn the gog gmail watch serve process
|
||||||
|
*/
|
||||||
|
function spawnGogServe(cfg: GmailHookRuntimeConfig): ChildProcess {
|
||||||
|
const args = buildGogWatchServeArgs(cfg);
|
||||||
|
log.info(`starting gog ${args.join(" ")}`);
|
||||||
|
|
||||||
|
const child = spawn("gog", args, {
|
||||||
|
stdio: ["ignore", "pipe", "pipe"],
|
||||||
|
detached: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
child.stdout?.on("data", (data: Buffer) => {
|
||||||
|
const line = data.toString().trim();
|
||||||
|
if (line) log.info(`[gog] ${line}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
child.stderr?.on("data", (data: Buffer) => {
|
||||||
|
const line = data.toString().trim();
|
||||||
|
if (line) log.warn(`[gog] ${line}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
child.on("error", (err) => {
|
||||||
|
log.error(`gog process error: ${String(err)}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
child.on("exit", (code, signal) => {
|
||||||
|
if (shuttingDown) return;
|
||||||
|
log.warn(`gog exited (code=${code}, signal=${signal}); restarting in 5s`);
|
||||||
|
watcherProcess = null;
|
||||||
|
setTimeout(() => {
|
||||||
|
if (shuttingDown || !currentConfig) return;
|
||||||
|
watcherProcess = spawnGogServe(currentConfig);
|
||||||
|
}, 5000);
|
||||||
|
});
|
||||||
|
|
||||||
|
return child;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type GmailWatcherStartResult = {
|
||||||
|
started: boolean;
|
||||||
|
reason?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the Gmail watcher service.
|
||||||
|
* Called automatically by the gateway if hooks.gmail is configured.
|
||||||
|
*/
|
||||||
|
export async function startGmailWatcher(
|
||||||
|
cfg: ClawdisConfig,
|
||||||
|
): Promise<GmailWatcherStartResult> {
|
||||||
|
// Check if gmail hooks are configured
|
||||||
|
if (!cfg.hooks?.enabled) {
|
||||||
|
return { started: false, reason: "hooks not enabled" };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!cfg.hooks?.gmail?.account) {
|
||||||
|
return { started: false, reason: "no gmail account configured" };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if gog is available
|
||||||
|
const gogAvailable = isGogAvailable();
|
||||||
|
if (!gogAvailable) {
|
||||||
|
return { started: false, reason: "gog binary not found" };
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve the full runtime config
|
||||||
|
const resolved = resolveGmailHookRuntimeConfig(cfg, {});
|
||||||
|
if (!resolved.ok) {
|
||||||
|
return { started: false, reason: resolved.error };
|
||||||
|
}
|
||||||
|
|
||||||
|
const runtimeConfig = resolved.value;
|
||||||
|
currentConfig = runtimeConfig;
|
||||||
|
|
||||||
|
// Set up Tailscale endpoint if needed
|
||||||
|
if (runtimeConfig.tailscale.mode !== "off") {
|
||||||
|
try {
|
||||||
|
await ensureTailscaleEndpoint({
|
||||||
|
mode: runtimeConfig.tailscale.mode,
|
||||||
|
path: runtimeConfig.tailscale.path,
|
||||||
|
port: runtimeConfig.serve.port,
|
||||||
|
});
|
||||||
|
log.info(
|
||||||
|
`tailscale ${runtimeConfig.tailscale.mode} configured for port ${runtimeConfig.serve.port}`,
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
log.error(`tailscale setup failed: ${String(err)}`);
|
||||||
|
return { started: false, reason: `tailscale setup failed: ${String(err)}` };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the Gmail watch (register with Gmail API)
|
||||||
|
const watchStarted = await startGmailWatch(runtimeConfig);
|
||||||
|
if (!watchStarted) {
|
||||||
|
log.warn("gmail watch start failed, but continuing with serve");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn the gog serve process
|
||||||
|
shuttingDown = false;
|
||||||
|
watcherProcess = spawnGogServe(runtimeConfig);
|
||||||
|
|
||||||
|
// Set up renewal interval
|
||||||
|
const renewMs = runtimeConfig.renewEveryMinutes * 60_000;
|
||||||
|
renewInterval = setInterval(() => {
|
||||||
|
if (shuttingDown) return;
|
||||||
|
void startGmailWatch(runtimeConfig);
|
||||||
|
}, renewMs);
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
`gmail watcher started for ${runtimeConfig.account} (renew every ${runtimeConfig.renewEveryMinutes}m)`,
|
||||||
|
);
|
||||||
|
|
||||||
|
return { started: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the Gmail watcher service.
|
||||||
|
*/
|
||||||
|
export async function stopGmailWatcher(): Promise<void> {
|
||||||
|
shuttingDown = true;
|
||||||
|
|
||||||
|
if (renewInterval) {
|
||||||
|
clearInterval(renewInterval);
|
||||||
|
renewInterval = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (watcherProcess) {
|
||||||
|
log.info("stopping gmail watcher");
|
||||||
|
watcherProcess.kill("SIGTERM");
|
||||||
|
|
||||||
|
// Wait a bit for graceful shutdown
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
if (watcherProcess) {
|
||||||
|
watcherProcess.kill("SIGKILL");
|
||||||
|
}
|
||||||
|
resolve();
|
||||||
|
}, 3000);
|
||||||
|
|
||||||
|
watcherProcess?.on("exit", () => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
watcherProcess = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
currentConfig = null;
|
||||||
|
log.info("gmail watcher stopped");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the Gmail watcher is running.
|
||||||
|
*/
|
||||||
|
export function isGmailWatcherRunning(): boolean {
|
||||||
|
return watcherProcess !== null && !shuttingDown;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user