diff --git a/CHANGELOG.md b/CHANGELOG.md index fe76316a2..7a9093af3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ - Telegram: support forum topics with topic-isolated sessions and message_thread_id routing. Thanks @HazAT, @nachoiacovino, @RandyVentures for PR #321/#333/#334. - Telegram: add draft streaming via `sendMessageDraft` with `telegram.streamMode`, plus `/reasoning stream` for draft-only reasoning. - Telegram: honor `/activation` session mode for group mention gating and clarify group activation docs. Thanks @julianengel for PR #377. +- Telegram: isolate forum topic transcripts per thread and validate Gemini turn ordering in multi-topic sessions. Thanks @hsrvc for PR #407. - iMessage: ignore disconnect errors during shutdown (avoid unhandled promise rejections). Thanks @antons for PR #359. - Messages: stop defaulting ack reactions to ๐Ÿ‘€ when identity emoji is missing. - Auto-reply: require slash for control commands to avoid false triggers in normal text. diff --git a/docs/concepts/session.md b/docs/concepts/session.md index 43b818801..311015bea 100644 --- a/docs/concepts/session.md +++ b/docs/concepts/session.md @@ -16,7 +16,7 @@ All session state is **owned by the gateway** (the โ€œmasterโ€ Clawdbot). UI cl ## Where state lives - On the **gateway host**: - Store file: `~/.clawdbot/agents//sessions/sessions.json` (per agent). - - Transcripts: `~/.clawdbot/agents//sessions/.jsonl` (one file per session id). +- Transcripts: `~/.clawdbot/agents//sessions/.jsonl` (Telegram topic sessions use `.../-topic-.jsonl`). - The store is a map `sessionKey -> { sessionId, updatedAt, ... }`. Deleting entries is safe; they are recreated on demand. - Group entries may include `displayName`, `provider`, `subject`, `room`, and `space` to label sessions in UIs. - Clawdbot does **not** read legacy Pi/Tau session folders. diff --git a/scripts/config-lock.sh b/scripts/config-lock.sh deleted file mode 100755 index c1182563d..000000000 --- a/scripts/config-lock.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/bin/bash -# ============================================================================= -# Config Lock: Makes clawdbot.json immutable to prevent any writes -# Usage: config-lock.sh [lock|unlock|status] -# ============================================================================= - -# Source unified environment -source "$(dirname "$0")/env.sh" - -lock_config() { - chflags uchg "$CONFIG" - log "๐Ÿ”’ Config LOCKED - write access disabled." -} - -unlock_config() { - chflags nouchg "$CONFIG" - log "๐Ÿ”“ Config UNLOCKED - write access enabled." -} - -check_status() { - if config_is_locked; then - echo "๐Ÿ”’ Config is LOCKED (immutable)" - return 0 - else - echo "๐Ÿ”“ Config is UNLOCKED (writable)" - return 1 - fi -} - -case "${1:-status}" in - lock) - lock_config - ;; - unlock) - unlock_config - ;; - status) - check_status - ;; - *) - echo "Usage: $0 [lock|unlock|status]" - echo " lock - Make config immutable (no writes allowed)" - echo " unlock - Allow writes (for manual edits)" - echo " status - Show current lock status" - ;; -esac diff --git a/scripts/config-watchdog.sh b/scripts/config-watchdog.sh deleted file mode 100755 index d4c50b306..000000000 --- a/scripts/config-watchdog.sh +++ /dev/null @@ -1,55 +0,0 @@ -#!/bin/bash -# ============================================================================= -# Config Watchdog: Detects unauthorized changes to model config -# Restores if changed (backup protection if config unlocked) -# ============================================================================= - -# Source unified environment -source "$(dirname "$0")/env.sh" - -EXPECTED_PRIMARY="antigravity/gemini-3-pro-low" -EXPECTED_FALLBACKS='["antigravity/claude-sonnet-4-5","antigravity/gemini-3-flash","antigravity/gemini-3-pro-high","antigravity/claude-opus-4-5","antigravity/claude-sonnet-4-5-thinking","antigravity/claude-opus-4-5-thinking"]' - -log "Config watchdog check..." - -# If config is locked, just verify and exit -if config_is_locked; then - log "โœ… Config is LOCKED (immutable) - no changes possible." - exit 0 -fi - -# Config is unlocked - check for tampering -log "โš ๏ธ Config is UNLOCKED - checking for unauthorized changes..." - -CURRENT_PRIMARY=$(jq -r '.agent.model.primary' "$CONFIG" 2>/dev/null) -CURRENT_FALLBACKS=$(jq -c '.agent.model.fallbacks' "$CONFIG" 2>/dev/null) - -CHANGED=false - -if [ "$CURRENT_PRIMARY" != "$EXPECTED_PRIMARY" ]; then - log "โš ๏ธ PRIMARY CHANGED: $CURRENT_PRIMARY โ†’ $EXPECTED_PRIMARY" - CHANGED=true -fi - -if [ "$CURRENT_FALLBACKS" != "$EXPECTED_FALLBACKS" ]; then - log "โš ๏ธ FALLBACKS CHANGED!" - CHANGED=true -fi - -if [ "$CHANGED" = true ]; then - log "๐Ÿ”ง RESTORING CONFIG..." - jq --arg primary "$EXPECTED_PRIMARY" \ - --argjson fallbacks "$EXPECTED_FALLBACKS" \ - '.agent.model.primary = $primary | .agent.model.fallbacks = $fallbacks' \ - "$CONFIG" > "${CONFIG}.tmp" && mv "${CONFIG}.tmp" "$CONFIG" - - if [ $? -eq 0 ]; then - log "โœ… Config restored. Re-locking..." - "$SCRIPTS_DIR/config-lock.sh" lock - else - log "โŒ Failed to restore config!" - fi -else - log "โœ… Config OK - re-locking..." - "$SCRIPTS_DIR/config-lock.sh" lock -fi diff --git a/scripts/env.sh b/scripts/env.sh deleted file mode 100755 index b024762b3..000000000 --- a/scripts/env.sh +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/bash -# ============================================================================= -# Unified environment for all clawdbot scripts -# Source this at the top of every script: source "$(dirname "$0")/env.sh" -# ============================================================================= - -# Comprehensive PATH for cron environment -export PATH="/usr/sbin:/usr/bin:/bin:/opt/homebrew/bin:$HOME/.bun/bin:/usr/local/bin:$PATH" - -# Core directories -export CLAWDBOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." 2>/dev/null && pwd)" -export SCRIPTS_DIR="$CLAWDBOT_DIR/scripts" -export CONFIG="$HOME/.clawdbot/clawdbot.json" -export LOG_DIR="$HOME/.clawdbot/logs" - -# Gateway settings -export PORT=18789 - -# Ensure log directory exists -mkdir -p "$LOG_DIR" 2>/dev/null - -# Helper: Check if config is locked -config_is_locked() { - ls -lO "$CONFIG" 2>/dev/null | grep -q "uchg" -} - -# Helper: Log with timestamp -log() { - echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" -} diff --git a/scripts/keep-alive.sh b/scripts/keep-alive.sh deleted file mode 100755 index a7848ace9..000000000 --- a/scripts/keep-alive.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash -# ============================================================================= -# Keep-Alive: Ensures clawdbot gateway is always running -# Runs via cron every 2 minutes -# ============================================================================= - -# Source unified environment -source "$(dirname "$0")/env.sh" - -log "Checking clawdbot status..." - -# Check if gateway is running (port check) -if lsof -i :$PORT > /dev/null 2>&1; then - # Additional health check via HTTP - if curl -sf "http://127.0.0.1:$PORT/health" > /dev/null 2>&1; then - log "โœ… Status: ONLINE (Port $PORT active, health OK)" - else - log "โš ๏ธ Status: DEGRADED (Port $PORT active, but health check failed)" - fi - exit 0 -else - log "โŒ Status: OFFLINE (Port $PORT closed). Initiating restart..." - "$SCRIPTS_DIR/models.sh" restart - log "Restart command executed." -fi diff --git a/scripts/models.sh b/scripts/models.sh deleted file mode 100755 index 508ed076c..000000000 --- a/scripts/models.sh +++ /dev/null @@ -1,82 +0,0 @@ -#!/bin/bash -# ============================================================================= -# Models: Gateway management and model config display -# Usage: ./scripts/models.sh [edit|restart|show] -# ============================================================================= - -# Source unified environment -source "$(dirname "$0")/env.sh" - -wait_for_port() { - local port=$1 - for i in {1..10}; do - if ! lsof -i :$port > /dev/null 2>&1; then - return 0 - fi - echo "Waiting for port $port to clear... ($i/10)" - sleep 1 - done - return 1 -} - -restart_gateway() { - log "Restarting gateway..." - - # Try graceful kill first - pkill -f "bun.*gateway --port $PORT" 2>/dev/null - pkill -f "node.*gateway.*$PORT" 2>/dev/null - pkill -f "tsx.*gateway.*$PORT" 2>/dev/null - - if ! wait_for_port $PORT; then - log "Port $PORT still in use. Forcing cleanup..." - lsof -ti :$PORT | xargs kill -9 2>/dev/null - sleep 1 - fi - - # Start gateway in background - cd "$CLAWDBOT_DIR" && pnpm clawdbot gateway --port $PORT & - - # Verify start - sleep 3 - if lsof -i :$PORT > /dev/null 2>&1; then - log "โœ… Gateway restarted successfully on port $PORT." - - # Auto-lock config after successful restart - "$SCRIPTS_DIR/config-lock.sh" lock - return 0 - else - log "โŒ Gateway failed to start. Check logs." - return 1 - fi -} - -case "${1:-show}" in - edit) - # Unlock config for editing - if config_is_locked; then - "$SCRIPTS_DIR/config-lock.sh" unlock - fi - - ${EDITOR:-nano} "$CONFIG" - echo "Config saved." - restart_gateway - ;; - restart) - restart_gateway - ;; - show) - echo "=== Model Priority ===" - echo "Primary: $(jq -r '.agent.model.primary' "$CONFIG")" - echo "" - echo "Fallbacks:" - jq -r '.agent.model.fallbacks[]' "$CONFIG" | nl - echo "" - echo "Config Lock: $(config_is_locked && echo '๐Ÿ”’ LOCKED' || echo '๐Ÿ”“ UNLOCKED')" - ;; - *) - echo "Usage: $0 [edit|restart|show]" - echo " show - Display current model priority (default)" - echo " edit - Edit config and restart gateway" - echo " restart - Just restart gateway" - ;; -esac diff --git a/src/agents/pi-embedded-helpers.ts b/src/agents/pi-embedded-helpers.ts index 5fb8fe68f..ad2ac3704 100644 --- a/src/agents/pi-embedded-helpers.ts +++ b/src/agents/pi-embedded-helpers.ts @@ -278,14 +278,12 @@ export function pickFallbackThinkingLevel(params: { * Gemini requires strict alternating userโ†’assistantโ†’toolโ†’user pattern. * This function: * 1. Detects consecutive messages from the same role - * 2. Merges consecutive assistant/tool messages together + * 2. Merges consecutive assistant messages together * 3. Preserves metadata (usage, stopReason, etc.) * * This prevents the "function call turn comes immediately after a user turn or after a function response turn" error. */ -export function validateGeminiTurns( - messages: AgentMessage[], -): AgentMessage[] { +export function validateGeminiTurns(messages: AgentMessage[]): AgentMessage[] { if (!Array.isArray(messages) || messages.length === 0) { return messages; } @@ -299,9 +297,7 @@ export function validateGeminiTurns( continue; } - const msgRole = (msg as { role?: unknown }).role as - | string - | undefined; + const msgRole = (msg as { role?: unknown }).role as string | undefined; if (!msgRole) { result.push(msg); continue; diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index c3ac0dbbf..ec15416f1 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -334,7 +334,6 @@ const EMBEDDED_RUN_WAITERS = new Map>(); type SessionManagerCacheEntry = { sessionFile: string; loadedAt: number; - lastAccessAt: number; }; const SESSION_MANAGER_CACHE = new Map(); @@ -362,7 +361,6 @@ function trackSessionManagerAccess(sessionFile: string): void { SESSION_MANAGER_CACHE.set(sessionFile, { sessionFile, loadedAt: now, - lastAccessAt: now, }); } @@ -380,9 +378,14 @@ async function prewarmSessionFile(sessionFile: string): Promise { if (isSessionManagerCached(sessionFile)) return; try { - // Touch the file to bring it into OS page cache - // This is much faster than letting SessionManager.open() do it cold - await fs.stat(sessionFile); + // Read a small chunk to encourage OS page cache warmup. + const handle = await fs.open(sessionFile, "r"); + try { + const buffer = Buffer.alloc(4096); + await handle.read(buffer, 0, buffer.length, 0); + } finally { + await handle.close(); + } trackSessionManagerAccess(sessionFile); } catch { // File doesn't exist yet, SessionManager will create it diff --git a/src/config/sessions.cache.test.ts b/src/config/sessions.cache.test.ts index addd68c1e..697a605b8 100644 --- a/src/config/sessions.cache.test.ts +++ b/src/config/sessions.cache.test.ts @@ -1,12 +1,12 @@ -import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import fs from "node:fs"; -import path from "node:path"; import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { - loadSessionStore, - saveSessionStore, clearSessionStoreCacheForTest, + loadSessionStore, type SessionEntry, + saveSessionStore, } from "./sessions.js"; describe("Session Store Cache", () => { @@ -52,7 +52,7 @@ describe("Session Store Cache", () => { expect(loaded).toEqual(testStore); }); - it("should cache session store on first load", async () => { + it("should cache session store on first load when file is unchanged", async () => { const testStore: Record = { "session:1": { sessionId: "id-1", @@ -63,26 +63,20 @@ describe("Session Store Cache", () => { await saveSessionStore(storePath, testStore); + const readSpy = vi.spyOn(fs, "readFileSync"); + // First load - from disk const loaded1 = loadSessionStore(storePath); expect(loaded1).toEqual(testStore); - // Modify file on disk - const modifiedStore: Record = { - "session:2": { - sessionId: "id-2", - updatedAt: Date.now(), - displayName: "Test Session 2", - }, - }; - fs.writeFileSync(storePath, JSON.stringify(modifiedStore, null, 2)); - - // Second load - should still return cached data (not the modified file) + // Second load - should return cached data (no extra disk read) const loaded2 = loadSessionStore(storePath); - expect(loaded2).toEqual(testStore); // Should be original, not modified + expect(loaded2).toEqual(testStore); + expect(readSpy).toHaveBeenCalledTimes(1); + readSpy.mockRestore(); }); - it("should cache multiple calls to the same store path", async () => { + it("should refresh cache when store file changes on disk", async () => { const testStore: Record = { "session:1": { sessionId: "id-1", @@ -98,12 +92,16 @@ describe("Session Store Cache", () => { expect(loaded1).toEqual(testStore); // Modify file on disk while cache is valid - fs.writeFileSync(storePath, JSON.stringify({ "session:99": { sessionId: "id-99", updatedAt: Date.now() } }, null, 2)); + const modifiedStore: Record = { + "session:99": { sessionId: "id-99", updatedAt: Date.now() }, + }; + fs.writeFileSync(storePath, JSON.stringify(modifiedStore, null, 2)); + const bump = new Date(Date.now() + 2000); + fs.utimesSync(storePath, bump, bump); - // Second load - should still return original cached data + // Second load - should return the updated store const loaded2 = loadSessionStore(storePath); - expect(loaded2).toEqual(testStore); - expect(loaded2).not.toHaveProperty("session:99"); + expect(loaded2).toEqual(modifiedStore); }); it("should invalidate cache on write", async () => { diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index c7529eaf1..0a62e50cc 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -8,6 +8,7 @@ import { deriveSessionKey, loadSessionStore, resolveSessionKey, + resolveSessionTranscriptPath, resolveSessionTranscriptsDir, updateLastRoute, } from "./sessions.js"; @@ -147,4 +148,21 @@ describe("sessions", () => { ); expect(dir).toBe("/legacy/state/agents/main/sessions"); }); + + it("includes topic ids in session transcript filenames", () => { + const prev = process.env.CLAWDBOT_STATE_DIR; + process.env.CLAWDBOT_STATE_DIR = "/custom/state"; + try { + const sessionFile = resolveSessionTranscriptPath("sess-1", "main", 123); + expect(sessionFile).toBe( + "/custom/state/agents/main/sessions/sess-1-topic-123.jsonl", + ); + } finally { + if (prev === undefined) { + delete process.env.CLAWDBOT_STATE_DIR; + } else { + process.env.CLAWDBOT_STATE_DIR = prev; + } + } + }); }); diff --git a/src/config/sessions.ts b/src/config/sessions.ts index 5ddf95534..6f30474c0 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -24,6 +24,7 @@ type SessionStoreCacheEntry = { store: Record; loadedAt: number; storePath: string; + mtimeMs?: number; }; const SESSION_STORE_CACHE = new Map(); @@ -52,6 +53,14 @@ function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean { return now - entry.loadedAt <= ttl; } +function getSessionStoreMtimeMs(storePath: string): number | undefined { + try { + return fs.statSync(storePath).mtimeMs; + } catch { + return undefined; + } +} + function invalidateSessionStoreCache(storePath: string): void { SESSION_STORE_CACHE.delete(storePath); } @@ -180,19 +189,22 @@ export function resolveSessionTranscriptPath( agentId?: string, topicId?: number, ): string { - const fileName = topicId !== undefined ? `${sessionId}-topic-${topicId}.jsonl` : `${sessionId}.jsonl`; + const fileName = + topicId !== undefined + ? `${sessionId}-topic-${topicId}.jsonl` + : `${sessionId}.jsonl`; return path.join(resolveAgentSessionsDir(agentId), fileName); } export function resolveSessionFilePath( sessionId: string, entry?: SessionEntry, - opts?: { agentId?: string }, + opts?: { agentId?: string; topicId?: number }, ): string { const candidate = entry?.sessionFile?.trim(); return candidate ? candidate - : resolveSessionTranscriptPath(sessionId, opts?.agentId); + : resolveSessionTranscriptPath(sessionId, opts?.agentId, opts?.topicId); } export function resolveStorePath(store?: string, opts?: { agentId?: string }) { @@ -390,19 +402,25 @@ export function loadSessionStore( if (isSessionStoreCacheEnabled()) { const cached = SESSION_STORE_CACHE.get(storePath); if (cached && isSessionStoreCacheValid(cached)) { - // Return a shallow copy to prevent external mutations affecting cache - return { ...cached.store }; + const currentMtimeMs = getSessionStoreMtimeMs(storePath); + if (currentMtimeMs === cached.mtimeMs) { + // Return a shallow copy to prevent external mutations affecting cache + return { ...cached.store }; + } + invalidateSessionStoreCache(storePath); } } // Cache miss or disabled - load from disk let store: Record = {}; + let mtimeMs = getSessionStoreMtimeMs(storePath); try { const raw = fs.readFileSync(storePath, "utf-8"); const parsed = JSON5.parse(raw); if (parsed && typeof parsed === "object") { store = parsed as Record; } + mtimeMs = getSessionStoreMtimeMs(storePath) ?? mtimeMs; } catch { // ignore missing/invalid store; we'll recreate it } @@ -413,6 +431,7 @@ export function loadSessionStore( store: { ...store }, // Store a copy to prevent external mutations loadedAt: Date.now(), storePath, + mtimeMs, }); } diff --git a/src/config/types.ts b/src/config/types.ts index 4602c1fcd..e625a5914 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -31,15 +31,6 @@ export type SessionSendPolicyConfig = { rules?: SessionSendPolicyRule[]; }; -export type SessionCacheConfig = { - /** Enable session store caching (default: true). Set to false to disable. */ - enabled?: boolean; - /** Session store cache TTL in milliseconds (default: 45000 = 45s). Set to 0 to disable. */ - storeTtlMs?: number; - /** SessionManager cache TTL in milliseconds (default: 45000 = 45s). Set to 0 to disable. */ - managerTtlMs?: number; -}; - export type SessionConfig = { scope?: SessionScope; resetTriggers?: string[]; @@ -50,8 +41,6 @@ export type SessionConfig = { typingMode?: TypingMode; mainKey?: string; sendPolicy?: SessionSendPolicyConfig; - /** Session caching configuration. */ - cache?: SessionCacheConfig; agentToAgent?: { /** Max ping-pong turns between requester/target (0โ€“5). Default: 5. */ maxPingPongTurns?: number;