diff --git a/extensions/tlon/README.md b/extensions/tlon/README.md new file mode 100644 index 000000000..0fd7fd8da --- /dev/null +++ b/extensions/tlon/README.md @@ -0,0 +1,828 @@ +# Clawdbot Tlon/Urbit Integration + +Complete documentation for integrating Clawdbot with Tlon Messenger (built on Urbit). + +## Overview + +This extension enables Clawdbot to: +- Monitor and respond to direct messages on Tlon Messenger +- Monitor and respond to group channel messages when mentioned +- Auto-discover available group channels +- Use per-conversation subscriptions for reliable message delivery +- **Automatic AI model fallback** - Seamlessly switches from Anthropic to OpenAI when rate limited (see [FALLBACK.md](./FALLBACK.md)) + +**Ship:** ~sitrul-nacwyl +**Test User:** ~malmur-halmex + +## Architecture + +### Files + +- **`index.js`** - Plugin entry point, registers the Tlon channel adapter +- **`monitor.js`** - Core monitoring logic, handles incoming messages and AI dispatch +- **`urbit-sse-client.js`** - Custom SSE client for Urbit HTTP API +- **`core-bridge.js`** - Dynamic loader for clawdbot core modules +- **`package.json`** - Plugin package definition +- **`FALLBACK.md`** - AI model fallback system documentation + +### How It Works + +1. **Authentication**: Uses ship name + code to authenticate via `/~/login` endpoint +2. **Channel Creation**: Creates Tlon Messenger channel via PUT to `/~/channel/{uid}` +3. **Activation**: Sends "helm-hi" poke to activate channel (required!) +4. **Subscriptions**: + - **DMs**: Individual subscriptions to `/dm/{ship}` for each conversation + - **Groups**: Individual subscriptions to `/{channelNest}` for each channel +5. **SSE Stream**: Opens server-sent events stream for real-time updates +6. **Auto-Reconnection**: Automatically reconnects if SSE stream dies + - Exponential backoff (1s to 30s delays) + - Up to 10 reconnection attempts + - Generates new channel ID on each attempt +7. **Auto-Discovery**: Queries `/groups-ui/v6/init.json` to find all available channels +8. **Dynamic Refresh**: Polls every 2 minutes for new conversations/channels +9. **Message Processing**: When bot is mentioned, routes to AI via clawdbot core +10. **AI Fallback**: Automatically switches providers when rate limited + - Primary: Anthropic Claude Sonnet 4.5 + - Fallbacks: OpenAI GPT-4o, GPT-4 Turbo + - Automatic cooldown management + - See [FALLBACK.md](./FALLBACK.md) for details + +## Configuration + +### 1. Install Dependencies + +```bash +cd ~/.clawdbot/extensions/tlon +npm install +``` + +### 2. Configure Credentials + +Edit `~/.clawdbot/clawdbot.json`: + +```json +{ + "channels": { + "tlon": { + "enabled": true, + "ship": "your-ship-name", + "code": "your-ship-code", + "url": "https://your-ship-name.tlon.network", + "showModelSignature": false, + "dmAllowlist": ["~friend-ship-1", "~friend-ship-2"], + "defaultAuthorizedShips": ["~malmur-halmex"], + "authorization": { + "channelRules": { + "chat/~host-ship/channel-name": { + "mode": "open", + "allowedShips": [] + }, + "chat/~another-host/private-channel": { + "mode": "restricted", + "allowedShips": ["~malmur-halmex", "~sitrul-nacwyl"] + } + } + } + } + } +} +``` + +**Configuration Options:** +- `enabled` - Enable/disable the Tlon channel (default: `false`) +- `ship` - Your Urbit ship name (required) +- `code` - Your ship's login code (required) +- `url` - Your ship's URL (required) +- `showModelSignature` - Append model name to responses (default: `false`) + - When enabled, adds `[Generated by Claude Sonnet 4.5]` to the end of each response + - Useful for transparency about which AI model generated the response +- `dmAllowlist` - Ships allowed to send DMs (optional) + - If omitted or empty, all DMs are accepted (default behavior) + - Ship names can include or omit the `~` prefix + - Example: `["~trusted-friend", "~another-ship"]` + - Blocked DMs are logged for visibility +- `defaultAuthorizedShips` - Ships authorized in new/unconfigured channels (default: `["~malmur-halmex"]`) + - New channels default to `restricted` mode using these ships +- `authorization` - Per-channel access control (optional) + - `channelRules` - Map of channel nest to authorization rules + - `mode`: `"open"` (all ships) or `"restricted"` (allowedShips only) + - `allowedShips`: Array of authorized ships (only for `restricted` mode) + +**For localhost development:** +```json +"url": "http://localhost:8080" +``` + +**For Tlon-hosted ships:** +```json +"url": "https://{ship-name}.tlon.network" +``` + +### 3. Set Environment Variable + +The monitor needs to find clawdbot's core modules. Set the environment variable: + +```bash +export CLAWDBOT_ROOT=/opt/homebrew/lib/node_modules/clawdbot +``` + +Or if clawdbot is installed elsewhere: +```bash +export CLAWDBOT_ROOT=$(dirname $(dirname $(readlink -f $(which clawdbot)))) +``` + +**Make it permanent** (add to `~/.zshrc` or `~/.bashrc`): +```bash +echo 'export CLAWDBOT_ROOT=/opt/homebrew/lib/node_modules/clawdbot' >> ~/.zshrc +``` + +### 4. Configure AI Authentication + +The bot needs API credentials to generate responses. + +**Option A: Use Claude Code CLI credentials** +```bash +clawdbot agents add main +# Select "Use Claude Code CLI credentials" +``` + +**Option B: Use Anthropic API key** +```bash +clawdbot agents add main +# Enter your API key from console.anthropic.com +``` + +### 5. Start the Gateway + +```bash +CLAWDBOT_ROOT=/opt/homebrew/lib/node_modules/clawdbot clawdbot gateway +``` + +Or create a launch script: + +```bash +cat > ~/start-clawdbot.sh << 'EOF' +#!/bin/bash +export CLAWDBOT_ROOT=/opt/homebrew/lib/node_modules/clawdbot +clawdbot gateway +EOF +chmod +x ~/start-clawdbot.sh +``` + +## Usage + +### Testing + +1. Send a DM from another ship to ~sitrul-nacwyl +2. Mention the bot: `~sitrul-nacwyl hello there!` +3. Bot should respond with AI-generated reply + +### Monitoring Logs + +Check gateway logs: +```bash +tail -f /tmp/clawdbot/clawdbot-$(date +%Y-%m-%d).log +``` + +Look for these indicators: +- `[tlon] Successfully authenticated to https://...` +- `[tlon] Auto-discovered N chat channel(s)` +- `[tlon] Connected! All subscriptions active` +- `[tlon] Received DM from ~ship: "..." (mentioned: true)` +- `[tlon] Dispatching to AI for ~ship (DM)` +- `[tlon] Delivered AI reply to ~ship` + +### Group Channels + +The bot automatically discovers and subscribes to all group channels using **delta-based discovery** for efficiency. + +**How Auto-Discovery Works:** +1. **On startup:** Fetches changes from the last 5 days via `/groups-ui/v5/changes/~YYYY.M.D..20.19.51..9b9d.json` +2. **Periodic refresh:** Checks for new channels every 2 minutes +3. **Smart caching:** Only fetches deltas, not full state each time + +**Benefits:** +- Reduced bandwidth usage +- Faster startup (especially for ships with many groups) +- Automatically picks up new channels you join +- Context of recent group activity + +**Manual Configuration:** + +To disable auto-discovery and use specific channels: + +```json +{ + "channels": { + "tlon": { + "enabled": true, + "ship": "your-ship-name", + "code": "your-ship-code", + "url": "https://your-ship-name.tlon.network", + "autoDiscoverChannels": false, + "groupChannels": [ + "chat/~host-ship/channel-name", + "chat/~another-host/another-channel" + ] + } + } +} +``` + +### Model Signatures + +The bot can append the AI model name to each response for transparency. Enable this feature in your config: + +```json +{ + "channels": { + "tlon": { + "enabled": true, + "ship": "your-ship-name", + "code": "your-ship-code", + "url": "https://your-ship-name.tlon.network", + "showModelSignature": true + } + } +} +``` + +**Example output with signature enabled:** +``` +User: ~sitrul-nacwyl explain quantum computing +Bot: Quantum computing uses quantum mechanics principles like superposition + and entanglement to perform calculations... + + [Generated by Claude Sonnet 4.5] +``` + +**Supported model formats:** +- `Claude Opus 4.5` +- `Claude Sonnet 4.5` +- `GPT-4o` +- `GPT-4 Turbo` +- `Gemini 2.0 Flash` + +When using the [AI fallback system](./FALLBACK.md), signatures automatically reflect which model generated the response (e.g., if Anthropic is rate limited and OpenAI is used, the signature will show `GPT-4o`). + +### Channel History Summarization + +The bot can summarize recent channel activity when asked. This is useful for catching up on conversations you missed. + +**Trigger phrases:** +- `~bot-ship summarize this channel` +- `~bot-ship what did I miss?` +- `~bot-ship catch me up` +- `~bot-ship tldr` +- `~bot-ship channel summary` + +**Example:** +``` +User: ~sitrul-nacwyl what did I miss? +Bot: Here's a summary of the last 50 messages: + +Main topics discussed: +1. Discussion about Urbit networking (Ames protocol) +2. Planning for next week's developer meetup +3. Bug reports for the new UI update + +Key decisions: +- Meetup scheduled for Thursday at 3pm EST +- Priority on fixing the scrolling issue + +Notable participants: ~malmur-halmex, ~bolbex-fogdys +``` + +**How it works:** +- Fetches the last 50 messages from the channel +- Sends them to the AI for summarization +- Returns a concise summary with main topics, decisions, and action items + +### Thread Support + +The bot automatically maintains context in threaded conversations. When you mention the bot in a reply thread, it will respond within that thread instead of posting to the main channel. + +**Example:** +``` +Main channel post: + User A: ~sitrul-nacwyl what's the capital of France? + Bot: Paris is the capital of France. + └─ User B (in thread): ~sitrul-nacwyl and what's its population? + └─ Bot (in thread): Paris has a population of approximately 2.2 million... +``` + +**Benefits:** +- Keeps conversations organized +- Reduces noise in main channel +- Maintains conversation context within threads + +**Technical Details:** +The bot handles both top-level posts and thread replies with different data structures: +- Top-level posts: `response.post.r-post.set.essay` +- Thread replies: `response.post.r-post.reply.r-reply.set.memo` + +When replying in a thread, the bot uses the `parent-id` from the incoming message to ensure the reply stays within the same thread. + +**Note:** Thread support is automatic - no configuration needed. + +### Link Summarization + +The bot can fetch and summarize web content when you share links. + +**Example:** +``` +User: ~sitrul-nacwyl can you summarize this https://example.com/article +Bot: This article discusses... [summary of the content] +``` + +**How it works:** +- Bot extracts URLs from rich text messages (including inline links) +- Fetches the web page content +- Summarizes using the WebFetch tool + +### Channel Authorization + +Control which ships can invoke the bot in specific group channels. **New channels default to `restricted` mode** for security. + +#### Default Behavior + +**DMs:** Always open (no restrictions) +**Group Channels:** Restricted by default, only ships in `defaultAuthorizedShips` can invoke the bot + +#### Configuration + +```json +{ + "channels": { + "tlon": { + "enabled": true, + "ship": "sitrul-nacwyl", + "code": "your-code", + "url": "https://sitrul-nacwyl.tlon.network", + "defaultAuthorizedShips": ["~malmur-halmex"], + "authorization": { + "channelRules": { + "chat/~bitpyx-dildus/core": { + "mode": "open" + }, + "chat/~nocsyx-lassul/bongtable": { + "mode": "restricted", + "allowedShips": ["~malmur-halmex", "~sitrul-nacwyl"] + } + } + } + } + } +} +``` + +#### Authorization Modes + +**`open`** - Any ship can invoke the bot when mentioned +- Good for public channels +- No `allowedShips` needed + +**`restricted`** (default) - Only specific ships can invoke the bot +- Good for private/work channels +- Requires `allowedShips` list +- New channels use `defaultAuthorizedShips` if no rule exists + +#### Examples + +**Make a channel public:** +```json +"chat/~bitpyx-dildus/core": { + "mode": "open" +} +``` + +**Restrict to specific users:** +```json +"chat/~nocsyx-lassul/bongtable": { + "mode": "restricted", + "allowedShips": ["~malmur-halmex"] +} +``` + +**New channel (no config):** +- Mode: `restricted` (safe default) +- Allowed ships: `defaultAuthorizedShips` (e.g., `["~malmur-halmex"]`) + +#### Behavior + +**Authorized mention:** +``` +~malmur-halmex: ~sitrul-nacwyl tell me about quantum computing +Bot: [Responds with answer] +``` + +**Unauthorized mention (silently ignored):** +``` +~other-ship: ~sitrul-nacwyl tell me about quantum computing +Bot: [No response, logs show access denied] +``` + +**Check logs:** +```bash +tail -f /tmp/tlon-fallback.log | grep "Access" +``` + +You'll see: +``` +[tlon] ✅ Access granted: ~malmur-halmex in chat/~host/channel (authorized user) +[tlon] ⛔ Access denied: ~other-ship in chat/~host/channel (restricted, allowed: ~malmur-halmex) +``` + +## Technical Deep Dive + +### Urbit HTTP API Flow + +1. **Login** (POST `/~/login`) + - Sends `password={code}` + - Returns authentication cookie in `set-cookie` header + +2. **Channel Creation** (PUT `/~/channel/{channelId}`) + - Channel ID format: `{timestamp}-{random}` + - Body: array of subscription objects + - Response: 204 No Content + +3. **Channel Activation** (PUT `/~/channel/{channelId}`) + - **Critical:** Must send helm-hi poke BEFORE opening SSE stream + - Poke structure: + ```json + { + "id": timestamp, + "action": "poke", + "ship": "sitrul-nacwyl", + "app": "hood", + "mark": "helm-hi", + "json": "Opening API channel" + } + ``` + +4. **SSE Stream** (GET `/~/channel/{channelId}`) + - Headers: `Accept: text/event-stream` + - Returns Server-Sent Events + - Format: + ``` + id: {event-id} + data: {json-payload} + + ``` + +### Subscription Paths + +#### DMs (Chat App) +- **Path:** `/dm/{ship}` +- **App:** `chat` +- **Event Format:** + ```json + { + "id": "~ship/timestamp", + "whom": "~other-ship", + "response": { + "add": { + "memo": { + "author": "~sender-ship", + "sent": 1768742460781, + "content": [ + { + "inline": [ + "text", + {"ship": "~mentioned-ship"}, + "more text", + {"break": null} + ] + } + ] + } + } + } + } + ``` + +#### Group Channels (Channels App) +- **Path:** `/{channelNest}` +- **Channel Nest Format:** `chat/~host-ship/channel-name` +- **App:** `channels` +- **Event Format:** + ```json + { + "response": { + "post": { + "id": "message-id", + "r-post": { + "set": { + "essay": { + "author": "~sender-ship", + "sent": 1768742460781, + "kind": "/chat", + "content": [...] + } + } + } + } + } + } + ``` + +### Text Extraction + +Message content uses inline format with mixed types: +- Strings: plain text +- Objects with `ship`: mentions (e.g., `{"ship": "~sitrul-nacwyl"}`) +- Objects with `break`: line breaks (e.g., `{"break": null}`) + +Example: +```json +{ + "inline": [ + "Hey ", + {"ship": "~sitrul-nacwyl"}, + " how are you?", + {"break": null}, + "This is a new line" + ] +} +``` + +Extracts to: `"Hey ~sitrul-nacwyl how are you?\nThis is a new line"` + +### Mention Detection + +Simple includes check (case-insensitive): +```javascript +const normalizedBotShip = botShipName.startsWith("~") + ? botShipName + : `~${botShipName}`; +return messageText.toLowerCase().includes(normalizedBotShip.toLowerCase()); +``` + +Note: Word boundaries (`\b`) don't work with `~` character. + +## Troubleshooting + +### Issue: "Cannot read properties of undefined (reading 'href')" + +**Cause:** Some clawdbot dependencies (axios, Slack SDK) expect browser globals + +**Fix:** Window.location polyfill is already added to monitor.js (lines 1-18) + +### Issue: "Unable to resolve Clawdbot root" + +**Cause:** core-bridge.js can't find clawdbot installation + +**Fix:** Set `CLAWDBOT_ROOT` environment variable: +```bash +export CLAWDBOT_ROOT=/opt/homebrew/lib/node_modules/clawdbot +``` + +### Issue: SSE Stream Returns 403 Forbidden + +**Cause:** Trying to open SSE stream without activating channel first + +**Fix:** Send helm-hi poke before opening stream (urbit-sse-client.js handles this) + +### Issue: No Events Received After Subscribing + +**Cause:** Wrong subscription path or app name + +**Fix:** +- DMs: Use `/dm/{ship}` with `app: "chat"` +- Groups: Use `/{channelNest}` with `app: "channels"` + +### Issue: Messages Show "[object Object]" + +**Cause:** Not handling inline content objects properly + +**Fix:** Text extraction handles mentions and breaks (monitor.js `extractMessageText()`) + +### Issue: Bot Not Detecting Mentions + +**Cause:** Message doesn't contain bot's ship name + +**Debug:** +```bash +tail -f /tmp/clawdbot/clawdbot-*.log | grep "mentioned:" +``` + +Should show: +``` +[tlon] Received DM from ~malmur-halmex: "~sitrul-nacwyl hello..." (mentioned: true) +``` + +### Issue: "No API key found for provider 'anthropic'" + +**Cause:** AI authentication not configured + +**Fix:** Run `clawdbot agents add main` and configure credentials + +### Issue: Gateway Port Already in Use + +**Fix:** +```bash +# Stop existing instance +clawdbot daemon stop + +# Or force kill +lsof -ti:18789 | xargs kill -9 +``` + +### Issue: Bot Stops Responding (SSE Disconnection) + +**Cause:** Urbit SSE stream disconnected (sent "quit" event or stream ended) + +**Symptoms:** +- Logs show: `[SSE] Received event: {"id":X,"response":"quit"}` +- No more incoming SSE events +- Bot appears online but doesn't respond to mentions + +**Fix:** The bot now **automatically reconnects**! Look for these log messages: +``` +[SSE] Stream ended, attempting reconnection... +[SSE] Reconnection attempt 1/10 in 1000ms... +[SSE] Reconnecting with new channel ID: xxx-yyy +[SSE] Reconnection successful! +``` + +**Manual restart if needed:** +```bash +kill $(pgrep -f "clawdbot gateway") +CLAWDBOT_ROOT=/opt/homebrew/lib/node_modules/clawdbot clawdbot gateway +``` + +**Configuration options** (in urbit-sse-client.js constructor): +```javascript +new UrbitSSEClient(url, cookie, { + autoReconnect: true, // Default: true + maxReconnectAttempts: 10, // Default: 10 + reconnectDelay: 1000, // Initial delay: 1s + maxReconnectDelay: 30000, // Max delay: 30s + onReconnect: async (client) => { + // Optional callback for resubscription logic + } +}) +``` + +## Development Notes + +### Testing Without Clawdbot + +You can test the Urbit API directly: + +```javascript +import { UrbitSSEClient } from "./urbit-sse-client.js"; + +const api = new UrbitSSEClient( + "https://sitrul-nacwyl.tlon.network", + "your-cookie-here" +); + +// Subscribe to DMs +await api.subscribe({ + app: "chat", + path: "/dm/malmur-halmex", + event: (data) => console.log("DM:", data), + err: (e) => console.error("Error:", e), + quit: () => console.log("Quit") +}); + +// Connect +await api.connect(); + +// Send a DM +await api.poke({ + app: "chat", + mark: "chat-dm-action", + json: { + ship: "~malmur-halmex", + diff: { + id: `~sitrul-nacwyl/${Date.now()}`, + delta: { + add: { + memo: { + content: [{ inline: ["Hello!"] }], + author: "~sitrul-nacwyl", + sent: Date.now() + }, + kind: null, + time: null + } + } + } + } +}); +``` + +### Debugging SSE Events + +Enable verbose logging in urbit-sse-client.js: + +```javascript +// Line 169-171 +if (parsed.response !== "subscribe" && parsed.response !== "poke") { + console.log("[SSE] Received event:", JSON.stringify(parsed).substring(0, 500)); +} +``` + +Remove the condition to see all events: +```javascript +console.log("[SSE] Received event:", JSON.stringify(parsed).substring(0, 500)); +``` + +### Channel Nest Format + +Format: `{type}/{host-ship}/{channel-name}` + +Examples: +- `chat/~bitpyx-dildus/core` +- `chat/~malmur-halmex/v3aedb3s` +- `chat/~sitrul-nacwyl/tm-wayfinding-group-chat` + +Parse with: +```javascript +const match = channelNest.match(/^([^/]+)\/([^/]+)\/(.+)$/); +const [, type, hostShip, channelName] = match; +``` + +### Auto-Discovery Endpoint + +Query: `GET /~/scry/groups-ui/v6/init.json` + +Response structure: +```json +{ + "groups": { + "group-id": { + "channels": { + "chat/~host/name": { ... }, + "diary/~host/name": { ... } + } + } + } +} +``` + +Filter for chat channels only: +```javascript +if (channelNest.startsWith("chat/")) { + channels.push(channelNest); +} +``` + +## Implementation Timeline + +### Major Milestones + +1. ✅ Plugin structure and registration +2. ✅ Authentication and cookie management +3. ✅ Channel creation and activation (helm-hi poke) +4. ✅ SSE stream connection +5. ✅ DM subscription and event parsing +6. ✅ Group channel support +7. ✅ Auto-discovery of channels +8. ✅ Per-conversation subscriptions +9. ✅ Text extraction (mentions and breaks) +10. ✅ Mention detection +11. ✅ Node.js polyfills (window.location) +12. ✅ Core module integration +13. ⏳ API authentication (user needs to configure) + +### Key Discoveries + +- **Helm-hi requirement:** Must send helm-hi poke before opening SSE stream +- **Subscription paths:** Frontend uses `/v3` globally, but individual `/dm/{ship}` and `/{channelNest}` paths work better +- **Event formats:** V3 API uses `essay` and `memo` structures (not older `writs` format) +- **Inline content:** Mixed array of strings and objects (mentions, breaks) +- **Tilde handling:** Ship mentions already include `~` prefix +- **Word boundaries:** `\b` regex doesn't work with `~` character +- **Browser globals:** axios and Slack SDK need window.location polyfill +- **Module resolution:** Need CLAWDBOT_ROOT for dynamic imports + +## Resources + +- **Tlon Apps GitHub:** https://github.com/tloncorp/tlon-apps +- **Urbit HTTP API:** @urbit/http-api package +- **Tlon Frontend Code:** `/tmp/tlon-apps/packages/shared/src/api/chatApi.ts` +- **Clawdbot Docs:** https://docs.clawd.bot/ +- **Anthropic Provider:** https://docs.clawd.bot/providers/anthropic + +## Future Enhancements + +- [ ] Support for message reactions +- [ ] Support for message editing/deletion +- [ ] Support for attachments/images +- [ ] Typing indicators +- [ ] Read receipts +- [ ] Message threading +- [ ] Channel-specific bot personas +- [ ] Rate limiting +- [ ] Message queuing for offline ships +- [ ] Metrics and monitoring + +## Credits + +Built for integrating Clawdbot with Tlon messenger. + +**Developer:** Claude (Sonnet 4.5) +**Platform:** Tlon Messenger built on Urbit diff --git a/extensions/tlon/clawdbot.plugin.json b/extensions/tlon/clawdbot.plugin.json new file mode 100644 index 000000000..85e1aaa8c --- /dev/null +++ b/extensions/tlon/clawdbot.plugin.json @@ -0,0 +1,11 @@ +{ + "id": "tlon", + "channels": [ + "tlon" + ], + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": {} + } +} diff --git a/extensions/tlon/index.ts b/extensions/tlon/index.ts new file mode 100644 index 000000000..52b82e9dd --- /dev/null +++ b/extensions/tlon/index.ts @@ -0,0 +1,16 @@ +import type { ClawdbotPluginApi } from "clawdbot/plugin-sdk"; +import { emptyPluginConfigSchema } from "clawdbot/plugin-sdk"; + +import { tlonPlugin } from "./src/channel.js"; + +const plugin = { + id: "tlon", + name: "Tlon", + description: "Tlon/Urbit channel plugin", + configSchema: emptyPluginConfigSchema(), + register(api: ClawdbotPluginApi) { + api.registerChannel({ plugin: tlonPlugin }); + }, +}; + +export default plugin; diff --git a/extensions/tlon/package.json b/extensions/tlon/package.json new file mode 100644 index 000000000..c11d45c97 --- /dev/null +++ b/extensions/tlon/package.json @@ -0,0 +1,16 @@ +{ + "name": "@clawdbot/tlon", + "version": "2026.1.22", + "type": "module", + "description": "Clawdbot Tlon/Urbit channel plugin", + "clawdbot": { + "extensions": [ + "./index.ts" + ] + }, + "dependencies": { + "@urbit/http-api": "^3.0.0", + "@urbit/aura": "^2.0.0", + "eventsource": "^2.0.2" + } +} diff --git a/extensions/tlon/src/channel.js b/extensions/tlon/src/channel.js new file mode 100644 index 000000000..c1974f91b --- /dev/null +++ b/extensions/tlon/src/channel.js @@ -0,0 +1,360 @@ +import { Urbit } from "@urbit/http-api"; +import { unixToDa, formatUd } from "@urbit/aura"; + +// Polyfill minimal browser globals needed by @urbit/http-api in Node +if (typeof global.window === "undefined") { + global.window = { fetch: global.fetch }; +} +if (typeof global.document === "undefined") { + global.document = { + hidden: true, + addEventListener() {}, + removeEventListener() {}, + }; +} + +// Patch Urbit.prototype.connect for HTTP authentication +const { connect } = Urbit.prototype; +Urbit.prototype.connect = async function patchedConnect() { + const resp = await fetch(`${this.url}/~/login`, { + method: "POST", + body: `password=${this.code}`, + credentials: "include", + }); + + if (resp.status >= 400) { + throw new Error("Login failed with status " + resp.status); + } + + const cookie = resp.headers.get("set-cookie"); + if (cookie) { + const match = /urbauth-~([\w-]+)/.exec(cookie); + if (!this.nodeId && match) { + this.nodeId = match[1]; + } + this.cookie = cookie; + } + await this.getShipName(); + await this.getOurName(); +}; + +/** + * Tlon/Urbit channel plugin for Clawdbot + */ +export const tlonPlugin = { + id: "tlon", + meta: { + id: "tlon", + label: "Tlon", + selectionLabel: "Tlon/Urbit", + docsPath: "/channels/tlon", + docsLabel: "tlon", + blurb: "Decentralized messaging on Urbit", + aliases: ["urbit"], + order: 90, + }, + capabilities: { + chatTypes: ["direct", "group"], + media: false, + }, + reload: { configPrefixes: ["channels.tlon"] }, + config: { + listAccountIds: (cfg) => { + const base = cfg.channels?.tlon; + if (!base) return []; + const accounts = base.accounts || {}; + return [ + ...(base.ship ? ["default"] : []), + ...Object.keys(accounts), + ]; + }, + resolveAccount: (cfg, accountId) => { + const base = cfg.channels?.tlon; + if (!base) { + return { + accountId: accountId || "default", + name: null, + enabled: false, + configured: false, + ship: null, + url: null, + code: null, + }; + } + + const useDefault = !accountId || accountId === "default"; + const account = useDefault ? base : base.accounts?.[accountId]; + + return { + accountId: accountId || "default", + name: account?.name || null, + enabled: account?.enabled !== false, + configured: Boolean(account?.ship && account?.code && account?.url), + ship: account?.ship || null, + url: account?.url || null, + code: account?.code || null, + groupChannels: account?.groupChannels || [], + dmAllowlist: account?.dmAllowlist || [], + notebookChannel: account?.notebookChannel || null, + }; + }, + defaultAccountId: () => "default", + setAccountEnabled: ({ cfg, accountId, enabled }) => { + const useDefault = !accountId || accountId === "default"; + + if (useDefault) { + return { + ...cfg, + channels: { + ...cfg.channels, + tlon: { + ...cfg.channels?.tlon, + enabled, + }, + }, + }; + } + + return { + ...cfg, + channels: { + ...cfg.channels, + tlon: { + ...cfg.channels?.tlon, + accounts: { + ...cfg.channels?.tlon?.accounts, + [accountId]: { + ...cfg.channels?.tlon?.accounts?.[accountId], + enabled, + }, + }, + }, + }, + }; + }, + deleteAccount: ({ cfg, accountId }) => { + const useDefault = !accountId || accountId === "default"; + + if (useDefault) { + const { ship, code, url, name, ...rest } = cfg.channels?.tlon || {}; + return { + ...cfg, + channels: { + ...cfg.channels, + tlon: rest, + }, + }; + } + + const { [accountId]: removed, ...remainingAccounts } = + cfg.channels?.tlon?.accounts || {}; + return { + ...cfg, + channels: { + ...cfg.channels, + tlon: { + ...cfg.channels?.tlon, + accounts: remainingAccounts, + }, + }, + }; + }, + isConfigured: (account) => account.configured, + describeAccount: (account) => ({ + accountId: account.accountId, + name: account.name, + enabled: account.enabled, + configured: account.configured, + ship: account.ship, + url: account.url, + }), + }, + messaging: { + normalizeTarget: (target) => { + // Normalize Urbit ship names + const trimmed = target.trim(); + if (!trimmed.startsWith("~")) { + return `~${trimmed}`; + } + return trimmed; + }, + targetResolver: { + looksLikeId: (target) => { + return /^~?[a-z-]+$/.test(target); + }, + hint: "~sampel-palnet or sampel-palnet", + }, + }, + outbound: { + deliveryMode: "direct", + chunker: (text, limit) => [text], // No chunking for now + textChunkLimit: 10000, + sendText: async ({ cfg, to, text, accountId }) => { + const account = tlonPlugin.config.resolveAccount(cfg, accountId); + + if (!account.configured) { + throw new Error("Tlon account not configured"); + } + + // Authenticate with Urbit + const api = await Urbit.authenticate({ + ship: account.ship.replace(/^~/, ""), + url: account.url, + code: account.code, + verbose: false, + }); + + try { + // Normalize ship name for sending + const toShip = to.startsWith("~") ? to : `~${to}`; + const fromShip = account.ship.startsWith("~") + ? account.ship + : `~${account.ship}`; + + // Construct message in Tlon format + const story = [{ inline: [text] }]; + const sentAt = Date.now(); + const idUd = formatUd(unixToDa(sentAt).toString()); + const id = `${fromShip}/${idUd}`; + + const delta = { + add: { + memo: { + content: story, + author: fromShip, + sent: sentAt, + }, + kind: null, + time: null, + }, + }; + + const action = { + ship: toShip, + diff: { id, delta }, + }; + + // Send via poke + await api.poke({ + app: "chat", + mark: "chat-dm-action", + json: action, + }); + + return { + channel: "tlon", + success: true, + messageId: id, + }; + } finally { + // Clean up connection + try { + await api.delete(); + } catch (e) { + // Ignore cleanup errors + } + } + }, + sendMedia: async ({ cfg, to, text, mediaUrl, accountId }) => { + // TODO: Tlon/Urbit doesn't support media attachments yet + // For now, send the caption text and include media URL in the message + const messageText = mediaUrl + ? `${text}\n\n[Media: ${mediaUrl}]` + : text; + + // Reuse sendText implementation + return await tlonPlugin.outbound.sendText({ + cfg, + to, + text: messageText, + accountId, + }); + }, + }, + status: { + defaultRuntime: { + accountId: "default", + running: false, + lastStartAt: null, + lastStopAt: null, + lastError: null, + }, + collectStatusIssues: (accounts) => { + return accounts.flatMap((account) => { + if (!account.configured) { + return [{ + channel: "tlon", + accountId: account.accountId, + kind: "config", + message: "Account not configured (missing ship, code, or url)", + }]; + } + return []; + }); + }, + buildChannelSummary: ({ snapshot }) => ({ + configured: snapshot.configured ?? false, + ship: snapshot.ship ?? null, + url: snapshot.url ?? null, + }), + probeAccount: async ({ account }) => { + if (!account.configured) { + return { ok: false, error: "Not configured" }; + } + + try { + const api = await Urbit.authenticate({ + ship: account.ship.replace(/^~/, ""), + url: account.url, + code: account.code, + verbose: false, + }); + + try { + await api.getOurName(); + return { ok: true }; + } finally { + await api.delete(); + } + } catch (error) { + return { ok: false, error: error.message }; + } + }, + buildAccountSnapshot: ({ account, runtime, probe }) => ({ + accountId: account.accountId, + name: account.name, + enabled: account.enabled, + configured: account.configured, + ship: account.ship, + url: account.url, + probe, + }), + }, + gateway: { + startAccount: async (ctx) => { + const account = ctx.account; + ctx.setStatus({ + accountId: account.accountId, + ship: account.ship, + url: account.url, + }); + ctx.log?.info( + `[${account.accountId}] starting Tlon provider for ${account.ship}` + ); + + // Lazy import to avoid circular dependencies + const { monitorTlonProvider } = await import("./monitor.js"); + + return monitorTlonProvider({ + account, + accountId: account.accountId, + cfg: ctx.cfg, + runtime: ctx.runtime, + abortSignal: ctx.abortSignal, + }); + }, + }, +}; + +// Export tlonPlugin for use by index.ts +export { tlonPlugin }; diff --git a/extensions/tlon/src/core-bridge.js b/extensions/tlon/src/core-bridge.js new file mode 100644 index 000000000..634ef3dd8 --- /dev/null +++ b/extensions/tlon/src/core-bridge.js @@ -0,0 +1,100 @@ +import fs from "node:fs"; +import path from "node:path"; +import { fileURLToPath, pathToFileURL } from "node:url"; + +let coreRootCache = null; +let coreDepsPromise = null; + +function findPackageRoot(startDir, name) { + let dir = startDir; + for (;;) { + const pkgPath = path.join(dir, "package.json"); + try { + if (fs.existsSync(pkgPath)) { + const raw = fs.readFileSync(pkgPath, "utf8"); + const pkg = JSON.parse(raw); + if (pkg.name === name) return dir; + } + } catch { + // ignore parse errors + } + const parent = path.dirname(dir); + if (parent === dir) return null; + dir = parent; + } +} + +function resolveClawdbotRoot() { + if (coreRootCache) return coreRootCache; + const override = process.env.CLAWDBOT_ROOT?.trim(); + if (override) { + coreRootCache = override; + return override; + } + + const candidates = new Set(); + if (process.argv[1]) { + candidates.add(path.dirname(process.argv[1])); + } + candidates.add(process.cwd()); + try { + const urlPath = fileURLToPath(import.meta.url); + candidates.add(path.dirname(urlPath)); + } catch { + // ignore + } + + for (const start of candidates) { + const found = findPackageRoot(start, "clawdbot"); + if (found) { + coreRootCache = found; + return found; + } + } + + throw new Error( + "Unable to resolve Clawdbot root. Set CLAWDBOT_ROOT to the package root.", + ); +} + +async function importCoreModule(relativePath) { + const root = resolveClawdbotRoot(); + const distPath = path.join(root, "dist", relativePath); + if (!fs.existsSync(distPath)) { + throw new Error( + `Missing core module at ${distPath}. Run \`pnpm build\` or install the official package.`, + ); + } + return await import(pathToFileURL(distPath).href); +} + +export async function loadCoreChannelDeps() { + if (coreDepsPromise) return coreDepsPromise; + + coreDepsPromise = (async () => { + const [ + chunk, + envelope, + dispatcher, + routing, + inboundContext, + ] = await Promise.all([ + importCoreModule("auto-reply/chunk.js"), + importCoreModule("auto-reply/envelope.js"), + importCoreModule("auto-reply/reply/provider-dispatcher.js"), + importCoreModule("routing/resolve-route.js"), + importCoreModule("auto-reply/reply/inbound-context.js"), + ]); + + return { + chunkMarkdownText: chunk.chunkMarkdownText, + formatAgentEnvelope: envelope.formatAgentEnvelope, + dispatchReplyWithBufferedBlockDispatcher: + dispatcher.dispatchReplyWithBufferedBlockDispatcher, + resolveAgentRoute: routing.resolveAgentRoute, + finalizeInboundContext: inboundContext.finalizeInboundContext, + }; + })(); + + return coreDepsPromise; +} diff --git a/extensions/tlon/src/monitor.js b/extensions/tlon/src/monitor.js new file mode 100644 index 000000000..8cfcf54ea --- /dev/null +++ b/extensions/tlon/src/monitor.js @@ -0,0 +1,1572 @@ +// Polyfill window.location for Node.js environment +// Required because some clawdbot dependencies (axios, Slack SDK) expect browser globals +if (typeof global.window === "undefined") { + global.window = {}; +} +if (!global.window.location) { + global.window.location = { + href: "http://localhost", + origin: "http://localhost", + protocol: "http:", + host: "localhost", + hostname: "localhost", + port: "", + pathname: "/", + search: "", + hash: "", + }; +} + +import { unixToDa, formatUd } from "@urbit/aura"; +import { UrbitSSEClient } from "./urbit-sse-client.js"; +import { loadCoreChannelDeps } from "./core-bridge.js"; + +console.log("[tlon] ====== monitor.js v2 loaded with action.post.reply structure ======"); + +/** + * Formats model name for display in signature + * Converts "anthropic/claude-sonnet-4-5" to "Claude Sonnet 4.5" + */ +function formatModelName(modelString) { + if (!modelString) return "AI"; + + // Remove provider prefix (e.g., "anthropic/", "openai/") + const modelName = modelString.includes("/") + ? modelString.split("/")[1] + : modelString; + + // Convert common model names to friendly format + const modelMappings = { + "claude-opus-4-5": "Claude Opus 4.5", + "claude-sonnet-4-5": "Claude Sonnet 4.5", + "claude-sonnet-3-5": "Claude Sonnet 3.5", + "gpt-4o": "GPT-4o", + "gpt-4-turbo": "GPT-4 Turbo", + "gpt-4": "GPT-4", + "gemini-2.0-flash": "Gemini 2.0 Flash", + "gemini-pro": "Gemini Pro", + }; + + return modelMappings[modelName] || modelName + .replace(/-/g, " ") + .split(" ") + .map(word => word.charAt(0).toUpperCase() + word.slice(1)) + .join(" "); +} + +/** + * Authenticate and get cookie + */ +async function authenticate(url, code) { + const resp = await fetch(`${url}/~/login`, { + method: "POST", + headers: { "Content-Type": "application/x-www-form-urlencoded" }, + body: `password=${code}`, + }); + + if (!resp.ok) { + throw new Error(`Login failed with status ${resp.status}`); + } + + // Read and discard the token body + await resp.text(); + + // Extract cookie + const cookie = resp.headers.get("set-cookie"); + if (!cookie) { + throw new Error("No authentication cookie received"); + } + + return cookie; +} + +/** + * Sends a direct message via Urbit + */ +async function sendDm(api, fromShip, toShip, text) { + const story = [{ inline: [text] }]; + const sentAt = Date.now(); + const idUd = formatUd(unixToDa(sentAt).toString()); + const id = `${fromShip}/${idUd}`; + + const delta = { + add: { + memo: { + content: story, + author: fromShip, + sent: sentAt, + }, + kind: null, + time: null, + }, + }; + + const action = { + ship: toShip, + diff: { id, delta }, + }; + + await api.poke({ + app: "chat", + mark: "chat-dm-action", + json: action, + }); + + return { channel: "tlon", success: true, messageId: id }; +} + +/** + * Format a numeric ID with dots every 3 digits (Urbit @ud format) + * Example: "170141184507780357587090523864791252992" -> "170.141.184.507.780.357.587.090.523.864.791.252.992" + */ +function formatUdId(id) { + if (!id) return id; + const idStr = String(id); + // Insert dots every 3 characters from the left + return idStr.replace(/\B(?=(\d{3})+(?!\d))/g, '.'); +} + +/** + * Sends a message to a group channel + * @param {string} replyTo - Optional parent post ID for threading + */ +async function sendGroupMessage(api, fromShip, hostShip, channelName, text, replyTo = null, runtime = null) { + const story = [{ inline: [text] }]; + const sentAt = Date.now(); + + // Format reply ID with dots for Urbit @ud format + const formattedReplyTo = replyTo ? formatUdId(replyTo) : null; + + const action = { + channel: { + nest: `chat/${hostShip}/${channelName}`, + action: formattedReplyTo ? { + // Reply action for threading (wraps reply in post like official client) + post: { + reply: { + id: formattedReplyTo, + action: { + add: { + content: story, + author: fromShip, + sent: sentAt, + } + } + } + } + } : { + // Regular post action + post: { + add: { + content: story, + author: fromShip, + sent: sentAt, + kind: "/chat", + blob: null, + meta: null, + }, + }, + }, + }, + }; + + runtime?.log?.(`[tlon] 📤 Sending message: replyTo=${replyTo} (formatted: ${formattedReplyTo}), text="${text.substring(0, 100)}...", nest=chat/${hostShip}/${channelName}`); + runtime?.log?.(`[tlon] 📤 Action type: ${formattedReplyTo ? 'REPLY (thread)' : 'POST (main channel)'}`); + runtime?.log?.(`[tlon] 📤 Full action structure: ${JSON.stringify(action, null, 2)}`); + + try { + const pokeResult = await api.poke({ + app: "channels", + mark: "channel-action-1", + json: action, + }); + + runtime?.log?.(`[tlon] 📤 Poke succeeded: ${JSON.stringify(pokeResult)}`); + return { channel: "tlon", success: true, messageId: `${fromShip}/${sentAt}` }; + } catch (error) { + runtime?.error?.(`[tlon] 📤 Poke FAILED: ${error.message}`); + runtime?.error?.(`[tlon] 📤 Error details: ${JSON.stringify(error)}`); + throw error; + } +} + +/** + * Checks if the bot's ship is mentioned in a message + */ +function isBotMentioned(messageText, botShipName) { + if (!messageText || !botShipName) return false; + + // Normalize bot ship name (ensure it has ~) + const normalizedBotShip = botShipName.startsWith("~") + ? botShipName + : `~${botShipName}`; + + // Escape special regex characters + const escapedShip = normalizedBotShip.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + + // Check for mention - ship name should be at start, after whitespace, or standalone + const mentionPattern = new RegExp(`(^|\\s)${escapedShip}(?=\\s|$)`, "i"); + return mentionPattern.test(messageText); +} + +/** + * Parses commands related to notebook operations + * @param {string} messageText - The message to parse + * @returns {Object|null} Command info or null if no command detected + */ +function parseNotebookCommand(messageText) { + const text = messageText.toLowerCase().trim(); + + // Save to notebook patterns + const savePatterns = [ + /save (?:this|that) to (?:my )?notes?/i, + /save to (?:my )?notes?/i, + /save to notebook/i, + /add to (?:my )?diary/i, + /save (?:this|that) to (?:my )?diary/i, + /save to (?:my )?diary/i, + /save (?:this|that)/i, + ]; + + for (const pattern of savePatterns) { + if (pattern.test(text)) { + return { + type: "save_to_notebook", + title: extractTitle(messageText), + }; + } + } + + // List notebook patterns + const listPatterns = [ + /(?:list|show) (?:my )?(?:notes?|notebook|diary)/i, + /what(?:'s| is) in (?:my )?(?:notes?|notebook|diary)/i, + /check (?:my )?(?:notes?|notebook|diary)/i, + ]; + + for (const pattern of listPatterns) { + if (pattern.test(text)) { + return { + type: "list_notebook", + }; + } + } + + return null; +} + +/** + * Extracts a title from a save command + * @param {string} text - The message text + * @returns {string|null} Extracted title or null + */ +function extractTitle(text) { + // Try to extract title from "as [title]" or "with title [title]" + const asMatch = /(?:as|with title)\s+["']([^"']+)["']/i.exec(text); + if (asMatch) return asMatch[1]; + + const asMatch2 = /(?:as|with title)\s+(.+?)(?:\.|$)/i.exec(text); + if (asMatch2) return asMatch2[1].trim(); + + return null; +} + +/** + * Sends a post to an Urbit diary channel + * @param {Object} api - Authenticated Urbit API instance + * @param {Object} account - Account configuration + * @param {string} diaryChannel - Diary channel in format "diary/~host/channel-id" + * @param {string} title - Post title + * @param {string} content - Post content + * @returns {Promise<{essayId: string, sentAt: number}>} + */ +async function sendDiaryPost(api, account, diaryChannel, title, content) { + // Parse channel format: "diary/~host/channel-id" + const match = /^diary\/~?([a-z-]+)\/([a-z0-9]+)$/i.exec(diaryChannel); + + if (!match) { + throw new Error(`Invalid diary channel format: ${diaryChannel}. Expected: diary/~host/channel-id`); + } + + const host = match[1]; + const channelId = match[2]; + const nest = `diary/~${host}/${channelId}`; + + // Construct essay (diary entry) format + const sentAt = Date.now(); + const idUd = formatUd(unixToDa(sentAt).toString()); + const fromShip = account.ship.startsWith("~") ? account.ship : `~${account.ship}`; + const essayId = `${fromShip}/${idUd}`; + + const action = { + channel: { + nest, + action: { + post: { + add: { + content: [{ inline: [content] }], + sent: sentAt, + kind: "/diary", + author: fromShip, + blob: null, + meta: { + title: title || "Saved Note", + image: "", + description: "", + cover: "", + }, + }, + }, + }, + }, + }; + + await api.poke({ + app: "channels", + mark: "channel-action-1", + json: action, + }); + + return { essayId, sentAt }; +} + +/** + * Fetches diary entries from an Urbit diary channel + * @param {Object} api - Authenticated Urbit API instance + * @param {string} diaryChannel - Diary channel in format "diary/~host/channel-id" + * @param {number} limit - Maximum number of entries to fetch (default: 10) + * @returns {Promise} Array of diary entries with { id, title, content, author, sent } + */ +async function fetchDiaryEntries(api, diaryChannel, limit = 10) { + // Parse channel format: "diary/~host/channel-id" + const match = /^diary\/~?([a-z-]+)\/([a-z0-9]+)$/i.exec(diaryChannel); + + if (!match) { + throw new Error(`Invalid diary channel format: ${diaryChannel}. Expected: diary/~host/channel-id`); + } + + const host = match[1]; + const channelId = match[2]; + const nest = `diary/~${host}/${channelId}`; + + try { + // Scry the diary channel for posts + const response = await api.scry({ + app: "channels", + path: `/channel/${nest}/posts/newest/${limit}`, + }); + + if (!response || !response.posts) { + return []; + } + + // Extract and format diary entries + const entries = Object.entries(response.posts).map(([id, post]) => { + const essay = post.essay || {}; + + // Extract text content from prose blocks + let content = ""; + if (essay.content && Array.isArray(essay.content)) { + content = essay.content + .map((block) => { + if (block.block?.prose?.inline) { + return block.block.prose.inline.join(""); + } + return ""; + }) + .join("\n"); + } + + return { + id, + title: essay.title || "Untitled", + content, + author: essay.author || "unknown", + sent: essay.sent || 0, + }; + }); + + // Sort by sent time (newest first) + return entries.sort((a, b) => b.sent - a.sent); + } catch (error) { + console.error(`[tlon] Error fetching diary entries from ${nest}:`, error); + throw error; + } +} + +/** + * Checks if a ship is allowed to send DMs to the bot + */ +function isDmAllowed(senderShip, account) { + // If dmAllowlist is not configured or empty, allow all + if (!account.dmAllowlist || !Array.isArray(account.dmAllowlist) || account.dmAllowlist.length === 0) { + return true; + } + + // Normalize ship names for comparison (ensure ~ prefix) + const normalizedSender = senderShip.startsWith("~") + ? senderShip + : `~${senderShip}`; + + const normalizedAllowlist = account.dmAllowlist + .map((ship) => ship.startsWith("~") ? ship : `~${ship}`); + + // Check if sender is in allowlist + return normalizedAllowlist.includes(normalizedSender); +} + +/** + * Extracts text content from Tlon message structure + */ +function extractMessageText(content) { + if (!content || !Array.isArray(content)) return ""; + + return content + .map((block) => { + if (block.inline && Array.isArray(block.inline)) { + return block.inline + .map((item) => { + if (typeof item === "string") return item; + if (item && typeof item === "object") { + if (item.ship) return item.ship; // Ship mention + if (item.break !== undefined) return "\n"; // Line break + if (item.link && item.link.href) return item.link.href; // URL link + // Skip other objects (images, etc.) + } + return ""; + }) + .join(""); + } + return ""; + }) + .join("\n") + .trim(); +} + +/** + * Parses a channel nest identifier + * Format: chat/~host-ship/channel-name + */ +function parseChannelNest(nest) { + if (!nest) return null; + const parts = nest.split("/"); + if (parts.length !== 3 || parts[0] !== "chat") return null; + + return { + hostShip: parts[1], + channelName: parts[2], + }; +} + +/** + * Message cache for channel history (for faster access) + * Structure: Map> + */ +const messageCache = new Map(); +const MAX_CACHED_MESSAGES = 100; + +/** + * Adds a message to the cache + */ +function cacheMessage(channelNest, message) { + if (!messageCache.has(channelNest)) { + messageCache.set(channelNest, []); + } + + const cache = messageCache.get(channelNest); + cache.unshift(message); // Add to front (most recent) + + // Keep only last MAX_CACHED_MESSAGES + if (cache.length > MAX_CACHED_MESSAGES) { + cache.pop(); + } +} + +/** + * Fetches channel history from Urbit via scry + * Format: /channels/v4//posts/newest//outline.json + * Returns pagination object: { newest, posts: {...}, total, newer, older } + */ +async function fetchChannelHistory(api, channelNest, count = 50, runtime) { + try { + const scryPath = `/channels/v4/${channelNest}/posts/newest/${count}/outline.json`; + runtime?.log?.(`[tlon] Fetching history: ${scryPath}`); + + const data = await api.scry(scryPath); + runtime?.log?.(`[tlon] Scry returned data type: ${Array.isArray(data) ? 'array' : typeof data}, keys: ${typeof data === 'object' ? Object.keys(data).slice(0, 5).join(', ') : 'N/A'}`); + + if (!data) { + runtime?.log?.(`[tlon] Data is null`); + return []; + } + + // Extract posts from pagination object + let posts = []; + if (Array.isArray(data)) { + // Direct array of posts + posts = data; + } else if (data.posts && typeof data.posts === 'object') { + // Pagination object with posts property (keyed by ID) + posts = Object.values(data.posts); + runtime?.log?.(`[tlon] Extracted ${posts.length} posts from pagination object`); + } else if (typeof data === 'object') { + // Fallback: treat as keyed object + posts = Object.values(data); + } + + runtime?.log?.(`[tlon] Processing ${posts.length} posts`); + + // Extract posts from outline format + const messages = posts.map(item => { + // Handle both post and r-post structures + const essay = item.essay || item['r-post']?.set?.essay; + const seal = item.seal || item['r-post']?.set?.seal; + + return { + author: essay?.author || 'unknown', + content: extractMessageText(essay?.content || []), + timestamp: essay?.sent || Date.now(), + id: seal?.id, + }; + }).filter(msg => msg.content); // Filter out empty messages + + runtime?.log?.(`[tlon] Extracted ${messages.length} messages from history`); + return messages; + } catch (error) { + runtime?.log?.(`[tlon] Error fetching channel history: ${error.message}`); + console.error(`[tlon] Error fetching channel history: ${error.message}`, error.stack); + return []; + } +} + +/** + * Gets recent channel history (tries cache first, then scry) + */ +async function getChannelHistory(api, channelNest, count = 50, runtime) { + // Try cache first for speed + const cache = messageCache.get(channelNest) || []; + if (cache.length >= count) { + runtime?.log?.(`[tlon] Using cached messages (${cache.length} available)`); + return cache.slice(0, count); + } + + runtime?.log?.(`[tlon] Cache has ${cache.length} messages, need ${count}, fetching from scry...`); + // Fall back to scry for full history + return await fetchChannelHistory(api, channelNest, count, runtime); +} + +/** + * Detects if a message is a summarization request + */ +function isSummarizationRequest(messageText) { + const patterns = [ + /summarize\s+(this\s+)?(channel|chat|conversation)/i, + /what\s+did\s+i\s+miss/i, + /catch\s+me\s+up/i, + /channel\s+summary/i, + /tldr/i, + ]; + return patterns.some(pattern => pattern.test(messageText)); +} + +/** + * Formats a date for the groups-ui changes endpoint + * Format: ~YYYY.M.D..HH.MM.SS..XXXX (only date changes, time/hex stay constant) + */ +function formatChangesDate(daysAgo = 5) { + const now = new Date(); + const targetDate = new Date(now - (daysAgo * 24 * 60 * 60 * 1000)); + const year = targetDate.getFullYear(); + const month = targetDate.getMonth() + 1; + const day = targetDate.getDate(); + // Keep time and hex constant as per Urbit convention + return `~${year}.${month}.${day}..20.19.51..9b9d`; +} + +/** + * Fetches changes from groups-ui since a specific date + * Returns delta data that can be used to efficiently discover new channels + */ +async function fetchGroupChanges(api, runtime, daysAgo = 5) { + try { + const changeDate = formatChangesDate(daysAgo); + runtime.log?.(`[tlon] Fetching group changes since ${daysAgo} days ago (${changeDate})...`); + + const changes = await api.scry(`/groups-ui/v5/changes/${changeDate}.json`); + + if (changes) { + runtime.log?.(`[tlon] Successfully fetched changes data`); + return changes; + } + + return null; + } catch (error) { + runtime.log?.(`[tlon] Failed to fetch changes (falling back to full init): ${error.message}`); + return null; + } +} + +/** + * Fetches all channels the ship has access to + * Returns an array of channel nest identifiers (e.g., "chat/~host-ship/channel-name") + * Tries changes endpoint first for efficiency, falls back to full init + */ +async function fetchAllChannels(api, runtime) { + try { + runtime.log?.(`[tlon] Attempting auto-discovery of group channels...`); + + // Try delta-based changes first (more efficient) + const changes = await fetchGroupChanges(api, runtime, 5); + + let initData; + if (changes) { + // We got changes, but still need to extract channel info + // For now, fall back to full init since changes format varies + runtime.log?.(`[tlon] Changes data received, using full init for channel extraction`); + initData = await api.scry("/groups-ui/v6/init.json"); + } else { + // No changes data, use full init + initData = await api.scry("/groups-ui/v6/init.json"); + } + + const channels = []; + + // Extract chat channels from the groups data structure + if (initData && initData.groups) { + for (const [groupKey, groupData] of Object.entries(initData.groups)) { + if (groupData.channels) { + for (const channelNest of Object.keys(groupData.channels)) { + // Only include chat channels (not diary, heap, etc.) + if (channelNest.startsWith("chat/")) { + channels.push(channelNest); + } + } + } + } + } + + if (channels.length > 0) { + runtime.log?.(`[tlon] Auto-discovered ${channels.length} chat channel(s)`); + runtime.log?.(`[tlon] Channels: ${channels.slice(0, 5).join(", ")}${channels.length > 5 ? "..." : ""}`); + } else { + runtime.log?.(`[tlon] No chat channels found via auto-discovery`); + runtime.log?.(`[tlon] Add channels manually to config: channels.tlon.groupChannels`); + } + + return channels; + } catch (error) { + runtime.log?.(`[tlon] Auto-discovery failed: ${error.message}`); + runtime.log?.(`[tlon] To monitor group channels, add them to config: channels.tlon.groupChannels`); + runtime.log?.(`[tlon] Example: ["chat/~host-ship/channel-name"]`); + return []; + } +} + +/** + * Monitors Tlon/Urbit for incoming DMs and group messages + */ +export async function monitorTlonProvider(opts = {}) { + const runtime = opts.runtime ?? { + log: console.log, + error: console.error, + }; + + const account = opts.account; + if (!account) { + throw new Error("Tlon account configuration required"); + } + + runtime.log?.(`[tlon] Account config: ${JSON.stringify({ + showModelSignature: account.showModelSignature, + ship: account.ship, + hasCode: !!account.code, + hasUrl: !!account.url + })}`); + + const botShipName = account.ship.startsWith("~") + ? account.ship + : `~${account.ship}`; + + runtime.log?.(`[tlon] Starting monitor for ${botShipName}`); + + // Authenticate with Urbit + let api; + let cookie; + try { + runtime.log?.(`[tlon] Attempting authentication to ${account.url}...`); + runtime.log?.(`[tlon] Ship: ${account.ship.replace(/^~/, "")}`); + + cookie = await authenticate(account.url, account.code); + runtime.log?.(`[tlon] Successfully authenticated to ${account.url}`); + + // Create custom SSE client + api = new UrbitSSEClient(account.url, cookie); + } catch (error) { + runtime.error?.(`[tlon] Failed to authenticate: ${error.message}`); + throw error; + } + + // Get list of group channels to monitor + let groupChannels = []; + + // Try auto-discovery first (unless explicitly disabled) + if (account.autoDiscoverChannels !== false) { + try { + const discoveredChannels = await fetchAllChannels(api, runtime); + if (discoveredChannels.length > 0) { + groupChannels = discoveredChannels; + runtime.log?.(`[tlon] Auto-discovered ${groupChannels.length} channel(s)`); + } + } catch (error) { + runtime.error?.(`[tlon] Auto-discovery failed: ${error.message}`); + } + } + + // Fall back to manual config if auto-discovery didn't find anything + if (groupChannels.length === 0 && account.groupChannels && account.groupChannels.length > 0) { + groupChannels = account.groupChannels; + runtime.log?.(`[tlon] Using manual groupChannels config: ${groupChannels.join(", ")}`); + } + + if (groupChannels.length > 0) { + runtime.log?.( + `[tlon] Monitoring ${groupChannels.length} group channel(s): ${groupChannels.join(", ")}` + ); + } else { + runtime.log?.(`[tlon] No group channels to monitor (DMs only)`); + } + + // Keep track of processed message IDs to avoid duplicates + const processedMessages = new Set(); + + /** + * Handler for incoming DM messages + */ + const handleIncomingDM = async (update) => { + try { + runtime.log?.(`[tlon] DM handler called with update: ${JSON.stringify(update).substring(0, 200)}`); + + // Handle new DM event format: response.add.memo or response.reply.delta.add.memo (for threads) + let memo = update?.response?.add?.memo; + let parentId = null; + let replyId = null; + + // Check if this is a thread reply + if (!memo && update?.response?.reply) { + memo = update?.response?.reply?.delta?.add?.memo; + parentId = update.id; // The parent post ID + replyId = update?.response?.reply?.id; // The reply message ID + runtime.log?.(`[tlon] Thread reply detected, parent: ${parentId}, reply: ${replyId}`); + } + + if (!memo) { + runtime.log?.(`[tlon] DM update has no memo in response.add or response.reply`); + return; + } + + const messageId = replyId || update.id; + if (processedMessages.has(messageId)) return; + processedMessages.add(messageId); + + const senderShip = memo.author?.startsWith("~") + ? memo.author + : `~${memo.author}`; + + const messageText = extractMessageText(memo.content); + if (!messageText) return; + + // Determine which user's DM cache to use (the other party, not the bot) + const otherParty = senderShip === botShipName ? update.whom : senderShip; + const dmCacheKey = `dm/${otherParty}`; + + // Cache all DM messages (including bot's own) for history retrieval + if (!messageCache.has(dmCacheKey)) { + messageCache.set(dmCacheKey, []); + } + const cache = messageCache.get(dmCacheKey); + cache.unshift({ + id: messageId, + author: senderShip, + content: messageText, + timestamp: memo.sent || Date.now(), + }); + // Keep only last 50 messages + if (cache.length > 50) { + cache.length = 50; + } + + // Don't respond to our own messages + if (senderShip === botShipName) return; + + // Check DM access control + if (!isDmAllowed(senderShip, account)) { + runtime.log?.( + `[tlon] Blocked DM from ${senderShip}: not in allowed list` + ); + return; + } + + runtime.log?.( + `[tlon] Received DM from ${senderShip}: "${messageText.slice(0, 50)}..."${parentId ? ' (thread reply)' : ''}` + ); + + // All DMs are processed (no mention check needed) + + await processMessage({ + messageId, + senderShip, + messageText, + isGroup: false, + timestamp: memo.sent || Date.now(), + parentId, // Pass parentId for thread replies + }); + } catch (error) { + runtime.error?.(`[tlon] Error handling DM: ${error.message}`); + } + }; + + /** + * Handler for incoming group channel messages + */ + const handleIncomingGroupMessage = (channelNest) => async (update) => { + try { + runtime.log?.(`[tlon] Group handler called for ${channelNest} with update: ${JSON.stringify(update).substring(0, 200)}`); + const parsed = parseChannelNest(channelNest); + if (!parsed) return; + + const { hostShip, channelName } = parsed; + + // Handle both top-level posts and thread replies + // Top-level: response.post.r-post.set.essay + // Thread reply: response.post.r-post.reply.r-reply.set.memo + const essay = update?.response?.post?.["r-post"]?.set?.essay; + const memo = update?.response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.memo; + + if (!essay && !memo) { + runtime.log?.(`[tlon] Group update has neither essay nor memo`); + return; + } + + // Use memo for thread replies, essay for top-level posts + const content = memo || essay; + const isThreadReply = !!memo; + + // For thread replies, use the reply ID, not the parent post ID + const messageId = isThreadReply + ? update.response.post["r-post"]?.reply?.id + : update.response.post.id; + + if (processedMessages.has(messageId)) { + runtime.log?.(`[tlon] Skipping duplicate message ${messageId}`); + return; + } + processedMessages.add(messageId); + + const senderShip = content.author?.startsWith("~") + ? content.author + : `~${content.author}`; + + // Don't respond to our own messages + if (senderShip === botShipName) return; + + const messageText = extractMessageText(content.content); + if (!messageText) return; + + // Cache this message for history/summarization + cacheMessage(channelNest, { + author: senderShip, + content: messageText, + timestamp: content.sent || Date.now(), + id: messageId, + }); + + // Check if bot is mentioned + const mentioned = isBotMentioned(messageText, botShipName); + + runtime.log?.( + `[tlon] Received group message in ${channelNest} from ${senderShip}: "${messageText.slice(0, 50)}..." (mentioned: ${mentioned})` + ); + + // Only process if bot is mentioned + if (!mentioned) return; + + // Check channel authorization + const tlonConfig = opts.cfg?.channels?.tlon; + const authorization = tlonConfig?.authorization || {}; + const channelRules = authorization.channelRules || {}; + const defaultAuthorizedShips = tlonConfig?.defaultAuthorizedShips || ["~malmur-halmex"]; + + // Get channel rule or use default (restricted) + const channelRule = channelRules[channelNest]; + const mode = channelRule?.mode || "restricted"; // Default to restricted + const allowedShips = channelRule?.allowedShips || defaultAuthorizedShips; + + // Normalize sender ship (ensure it has ~) + const normalizedSender = senderShip.startsWith("~") ? senderShip : `~${senderShip}`; + + // Check authorization for restricted channels + if (mode === "restricted") { + const isAuthorized = allowedShips.some(ship => { + const normalizedAllowed = ship.startsWith("~") ? ship : `~${ship}`; + return normalizedAllowed === normalizedSender; + }); + + if (!isAuthorized) { + runtime.log?.( + `[tlon] ⛔ Access denied: ${normalizedSender} in ${channelNest} (restricted, allowed: ${allowedShips.join(", ")})` + ); + return; + } + + runtime.log?.( + `[tlon] ✅ Access granted: ${normalizedSender} in ${channelNest} (authorized user)` + ); + } else { + runtime.log?.( + `[tlon] ✅ Access granted: ${normalizedSender} in ${channelNest} (open channel)` + ); + } + + // Extract seal data for thread support + // For thread replies, seal is in a different location + const seal = isThreadReply + ? update?.response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.seal + : update?.response?.post?.["r-post"]?.set?.seal; + + // For thread replies, all messages in the thread share the same parent-id + // We reply to the parent-id to keep our message in the same thread + const parentId = seal?.["parent-id"] || seal?.parent || null; + const postType = update?.response?.post?.["r-post"]?.set?.type; + + runtime.log?.( + `[tlon] Message type: ${isThreadReply ? "thread reply" : "top-level post"}, parentId: ${parentId}, messageId: ${seal?.id}` + ); + + await processMessage({ + messageId, + senderShip, + messageText, + isGroup: true, + groupChannel: channelNest, + groupName: `${hostShip}/${channelName}`, + timestamp: content.sent || Date.now(), + parentId, // Reply to parent-id to stay in the thread + postType, + seal, + }); + } catch (error) { + runtime.error?.( + `[tlon] Error handling group message in ${channelNest}: ${error.message}` + ); + } + }; + + // Load core channel deps + const deps = await loadCoreChannelDeps(); + + /** + * Process a message and generate AI response + */ + const processMessage = async (params) => { + let { + messageId, + senderShip, + messageText, + isGroup, + groupChannel, + groupName, + timestamp, + parentId, // Parent post ID to reply to (for threading) + postType, + seal, + } = params; + + runtime.log?.(`[tlon] processMessage called for ${senderShip}, isGroup: ${isGroup}, message: "${messageText.substring(0, 50)}"`); + + // Check if this is a summarization request + if (isGroup && isSummarizationRequest(messageText)) { + runtime.log?.(`[tlon] Detected summarization request in ${groupChannel}`); + try { + const history = await getChannelHistory(api, groupChannel, 50, runtime); + if (history.length === 0) { + const noHistoryMsg = "I couldn't fetch any messages for this channel. It might be empty or there might be a permissions issue."; + if (isGroup) { + const parsed = parseChannelNest(groupChannel); + if (parsed) { + await sendGroupMessage( + api, + botShipName, + parsed.hostShip, + parsed.channelName, + noHistoryMsg, + null, + runtime + ); + } + } else { + await sendDm(api, botShipName, senderShip, noHistoryMsg); + } + return; + } + + // Format history for AI + const historyText = history + .map(msg => `[${new Date(msg.timestamp).toLocaleString()}] ${msg.author}: ${msg.content}`) + .join("\n"); + + const summaryPrompt = `Please summarize this channel conversation (${history.length} recent messages):\n\n${historyText}\n\nProvide a concise summary highlighting:\n1. Main topics discussed\n2. Key decisions or conclusions\n3. Action items if any\n4. Notable participants`; + + // Override message text with summary prompt + messageText = summaryPrompt; + runtime.log?.(`[tlon] Generating summary for ${history.length} messages`); + } catch (error) { + runtime.error?.(`[tlon] Error generating summary: ${error.message}`); + const errorMsg = `Sorry, I encountered an error while fetching the channel history: ${error.message}`; + if (isGroup) { + const parsed = parseChannelNest(groupChannel); + if (parsed) { + await sendGroupMessage( + api, + botShipName, + parsed.hostShip, + parsed.channelName, + errorMsg, + null, + runtime + ); + } + } else { + await sendDm(api, botShipName, senderShip, errorMsg); + } + return; + } + } + + // Check if this is a notebook command + const notebookCommand = parseNotebookCommand(messageText); + if (notebookCommand) { + runtime.log?.(`[tlon] Detected notebook command: ${notebookCommand.type}`); + + // Check if notebookChannel is configured + const notebookChannel = account.notebookChannel; + if (!notebookChannel) { + const errorMsg = "Notebook feature is not configured. Please add a 'notebookChannel' to your Tlon account config (e.g., diary/~malmur-halmex/v2u22f1d)."; + if (isGroup) { + const parsed = parseChannelNest(groupChannel); + if (parsed) { + await sendGroupMessage(api, botShipName, parsed.hostShip, parsed.channelName, errorMsg, parentId, runtime); + } + } else { + await sendDm(api, botShipName, senderShip, errorMsg); + } + return; + } + + // Handle save command + if (notebookCommand.type === "save_to_notebook") { + try { + let noteContent = null; + let noteTitle = notebookCommand.title; + + // If replying to a message (thread), save the parent message + if (parentId) { + runtime.log?.(`[tlon] Fetching parent message ${parentId} to save`); + + // For DMs, use messageCache directly since DM history scry isn't available + if (!isGroup) { + const dmCacheKey = `dm/${senderShip}`; + const cache = messageCache.get(dmCacheKey) || []; + const parentMsg = cache.find(msg => msg.id === parentId || msg.id.includes(parentId)); + + if (parentMsg) { + noteContent = parentMsg.content; + if (!noteTitle) { + // Generate title from first line or first 60 chars of content + const firstLine = noteContent.split('\n')[0]; + noteTitle = firstLine.length > 60 ? firstLine.substring(0, 60) + '...' : firstLine; + } + } else { + noteContent = "Could not find parent message in cache"; + noteTitle = noteTitle || "Note"; + } + } else { + const history = await getChannelHistory(api, groupChannel, 50, runtime); + const parentMsg = history.find(msg => msg.id === parentId || msg.id.includes(parentId)); + + if (parentMsg) { + noteContent = parentMsg.content; + if (!noteTitle) { + // Generate title from first line or first 60 chars of content + const firstLine = noteContent.split('\n')[0]; + noteTitle = firstLine.length > 60 ? firstLine.substring(0, 60) + '...' : firstLine; + } + } else { + noteContent = "Could not find parent message"; + noteTitle = noteTitle || "Note"; + } + } + } else { + // No parent - fetch last bot message + if (!isGroup) { + const dmCacheKey = `dm/${senderShip}`; + const cache = messageCache.get(dmCacheKey) || []; + const lastBotMsg = cache.find(msg => msg.author === botShipName); + + if (lastBotMsg) { + noteContent = lastBotMsg.content; + if (!noteTitle) { + // Generate title from first line or first 60 chars of content + const firstLine = noteContent.split('\n')[0]; + noteTitle = firstLine.length > 60 ? firstLine.substring(0, 60) + '...' : firstLine; + } + } else { + noteContent = "No recent bot message found in cache"; + noteTitle = noteTitle || "Note"; + } + } else { + const history = await getChannelHistory(api, groupChannel, 10, runtime); + const lastBotMsg = history.find(msg => msg.author === botShipName); + + if (lastBotMsg) { + noteContent = lastBotMsg.content; + if (!noteTitle) { + // Generate title from first line or first 60 chars of content + const firstLine = noteContent.split('\n')[0]; + noteTitle = firstLine.length > 60 ? firstLine.substring(0, 60) + '...' : firstLine; + } + } else { + noteContent = "No recent bot message found"; + noteTitle = noteTitle || "Note"; + } + } + } + + const { essayId, sentAt } = await sendDiaryPost( + api, + account, + notebookChannel, + noteTitle, + noteContent + ); + + const successMsg = `✓ Saved to notebook as "${noteTitle}"`; + runtime.log?.(`[tlon] Saved note ${essayId} to ${notebookChannel}`); + + if (isGroup) { + const parsed = parseChannelNest(groupChannel); + if (parsed) { + await sendGroupMessage(api, botShipName, parsed.hostShip, parsed.channelName, successMsg, parentId, runtime); + } + } else { + await sendDm(api, botShipName, senderShip, successMsg); + } + } catch (error) { + runtime.error?.(`[tlon] Error saving to notebook: ${error.message}`); + const errorMsg = `Failed to save to notebook: ${error.message}`; + if (isGroup) { + const parsed = parseChannelNest(groupChannel); + if (parsed) { + await sendGroupMessage(api, botShipName, parsed.hostShip, parsed.channelName, errorMsg, parentId, runtime); + } + } else { + await sendDm(api, botShipName, senderShip, errorMsg); + } + } + return; + } + + // Handle list command (placeholder for now) + if (notebookCommand.type === "list_notebook") { + const placeholderMsg = "List notebook handler not yet implemented."; + if (isGroup) { + const parsed = parseChannelNest(groupChannel); + if (parsed) { + await sendGroupMessage(api, botShipName, parsed.hostShip, parsed.channelName, placeholderMsg, parentId, runtime); + } + } else { + await sendDm(api, botShipName, senderShip, placeholderMsg); + } + return; + } + + return; // Don't send to AI for notebook commands + } + + try { + // Resolve agent route + const route = deps.resolveAgentRoute({ + cfg: opts.cfg, + channel: "tlon", + accountId: opts.accountId, + peer: { + kind: isGroup ? "group" : "dm", + id: isGroup ? groupChannel : senderShip, + }, + }); + + // Format message for AI + const fromLabel = isGroup + ? `${senderShip} in ${groupName}` + : senderShip; + + // Add Tlon identity context to help AI recognize when it's being addressed + // The AI knows itself as "bearclawd" but in Tlon it's addressed as the ship name + const identityNote = `[Note: In Tlon/Urbit, you are known as ${botShipName}. When users mention ${botShipName}, they are addressing you directly.]\n\n`; + const messageWithIdentity = identityNote + messageText; + + const body = deps.formatAgentEnvelope({ + channel: "Tlon", + from: fromLabel, + timestamp, + body: messageWithIdentity, + }); + + // Create inbound context + // For thread replies, append parent ID to session key to create separate conversation context + const sessionKeySuffix = parentId ? `:thread:${parentId}` : ''; + const finalSessionKey = `${route.sessionKey}${sessionKeySuffix}`; + + runtime.log?.( + `[tlon] 🔑 Session key construction: base="${route.sessionKey}", suffix="${sessionKeySuffix}", final="${finalSessionKey}"` + ); + + const ctxPayload = deps.finalizeInboundContext({ + Body: body, + RawBody: messageText, + CommandBody: messageText, + From: isGroup ? `tlon:group:${groupChannel}` : `tlon:${senderShip}`, + To: `tlon:${botShipName}`, + SessionKey: finalSessionKey, + AccountId: route.accountId, + ChatType: isGroup ? "group" : "direct", + ConversationLabel: fromLabel, + SenderName: senderShip, + SenderId: senderShip, + Provider: "tlon", + Surface: "tlon", + MessageSid: messageId, + OriginatingChannel: "tlon", + OriginatingTo: `tlon:${isGroup ? groupChannel : botShipName}`, + }); + + runtime.log?.( + `[tlon] 📋 Context payload keys: ${Object.keys(ctxPayload).join(', ')}` + ); + runtime.log?.( + `[tlon] 📋 Message body: "${body.substring(0, 100)}${body.length > 100 ? '...' : ''}"` + ); + + // Log transcript details + if (ctxPayload.Transcript && ctxPayload.Transcript.length > 0) { + runtime.log?.( + `[tlon] 📜 Transcript has ${ctxPayload.Transcript.length} message(s)` + ); + // Log last few messages for debugging + const recentMessages = ctxPayload.Transcript.slice(-3); + recentMessages.forEach((msg, idx) => { + runtime.log?.( + `[tlon] 📜 Transcript[-${3-idx}]: role=${msg.role}, content length=${JSON.stringify(msg.content).length}` + ); + }); + } else { + runtime.log?.( + `[tlon] 📜 Transcript is empty or missing` + ); + } + + // Log key fields that affect AI behavior + runtime.log?.( + `[tlon] 📝 BodyForAgent: "${ctxPayload.BodyForAgent?.substring(0, 100)}${(ctxPayload.BodyForAgent?.length || 0) > 100 ? '...' : ''}"` + ); + runtime.log?.( + `[tlon] 📝 ThreadStarterBody: "${ctxPayload.ThreadStarterBody?.substring(0, 100) || 'null'}${(ctxPayload.ThreadStarterBody?.length || 0) > 100 ? '...' : ''}"` + ); + runtime.log?.( + `[tlon] 📝 CommandAuthorized: ${ctxPayload.CommandAuthorized}` + ); + + // Dispatch to AI and get response + const dispatchStartTime = Date.now(); + runtime.log?.( + `[tlon] Dispatching to AI for ${senderShip} (${isGroup ? `group: ${groupName}` : 'DM'})` + ); + runtime.log?.( + `[tlon] 🚀 Dispatch details: sessionKey="${finalSessionKey}", isThreadReply=${!!parentId}, messageText="${messageText.substring(0, 50)}..."` + ); + + const dispatchResult = await deps.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: opts.cfg, + dispatcherOptions: { + deliver: async (payload) => { + runtime.log?.(`[tlon] 🎯 Deliver callback invoked! isThreadReply=${!!parentId}, parentId=${parentId}`); + const dispatchDuration = Date.now() - dispatchStartTime; + runtime.log?.(`[tlon] 📦 Payload keys: ${Object.keys(payload).join(', ')}, text length: ${payload.text?.length || 0}`); + let replyText = payload.text; + + if (!replyText) { + runtime.log?.(`[tlon] No reply text in AI response (took ${dispatchDuration}ms)`); + return; + } + + // Add model signature if enabled + const tlonConfig = opts.cfg?.channels?.tlon; + const showSignature = tlonConfig?.showModelSignature ?? false; + runtime.log?.(`[tlon] showModelSignature config: ${showSignature} (from cfg.channels.tlon)`); + runtime.log?.(`[tlon] Full payload keys: ${Object.keys(payload).join(', ')}`); + runtime.log?.(`[tlon] Full route keys: ${Object.keys(route).join(', ')}`); + runtime.log?.(`[tlon] opts.cfg.agents: ${JSON.stringify(opts.cfg?.agents?.defaults?.model)}`); + if (showSignature) { + const modelInfo = payload.metadata?.model || payload.model || route.model || opts.cfg?.agents?.defaults?.model?.primary; + runtime.log?.(`[tlon] Model info: ${JSON.stringify({ + payloadMetadataModel: payload.metadata?.model, + payloadModel: payload.model, + routeModel: route.model, + cfgModel: opts.cfg?.agents?.defaults?.model?.primary, + resolved: modelInfo + })}`); + if (modelInfo) { + const modelName = formatModelName(modelInfo); + runtime.log?.(`[tlon] Adding signature: ${modelName}`); + replyText = `${replyText}\n\n_[Generated by ${modelName}]_`; + } else { + runtime.log?.(`[tlon] No model info found, using fallback`); + replyText = `${replyText}\n\n_[Generated by AI]_`; + } + } + + runtime.log?.( + `[tlon] AI response received (took ${dispatchDuration}ms), sending to Tlon...` + ); + + // Debug delivery path + runtime.log?.(`[tlon] 🔍 Delivery debug: isGroup=${isGroup}, groupChannel=${groupChannel}, senderShip=${senderShip}, parentId=${parentId}`); + + // Send reply back to Tlon + if (isGroup) { + const parsed = parseChannelNest(groupChannel); + runtime.log?.(`[tlon] 🔍 Parsed channel nest: ${JSON.stringify(parsed)}`); + if (parsed) { + // Reply in thread if this message is part of a thread + if (parentId) { + runtime.log?.(`[tlon] Replying in thread (parent: ${parentId})`); + } + await sendGroupMessage( + api, + botShipName, + parsed.hostShip, + parsed.channelName, + replyText, + parentId, // Pass parentId to reply in the thread + runtime + ); + const threadInfo = parentId ? ` (in thread)` : ''; + runtime.log?.(`[tlon] Delivered AI reply to group ${groupName}${threadInfo}`); + } else { + runtime.log?.(`[tlon] ⚠️ Failed to parse channel nest: ${groupChannel}`); + } + } else { + await sendDm(api, botShipName, senderShip, replyText); + runtime.log?.(`[tlon] Delivered AI reply to ${senderShip}`); + } + }, + onError: (err, info) => { + const dispatchDuration = Date.now() - dispatchStartTime; + runtime.error?.( + `[tlon] ${info.kind} reply failed after ${dispatchDuration}ms: ${String(err)}` + ); + runtime.error?.(`[tlon] Error type: ${err?.constructor?.name || 'Unknown'}`); + runtime.error?.(`[tlon] Error details: ${JSON.stringify(info, null, 2)}`); + if (err?.stack) { + runtime.error?.(`[tlon] Stack trace: ${err.stack}`); + } + }, + }, + }); + + const totalDuration = Date.now() - dispatchStartTime; + runtime.log?.( + `[tlon] AI dispatch completed for ${senderShip} (total: ${totalDuration}ms), result keys: ${dispatchResult ? Object.keys(dispatchResult).join(', ') : 'null'}` + ); + runtime.log?.(`[tlon] Dispatch result: ${JSON.stringify(dispatchResult)}`); + } catch (error) { + runtime.error?.(`[tlon] Error processing message: ${error.message}`); + runtime.error?.(`[tlon] Stack trace: ${error.stack}`); + } + }; + + // Track currently subscribed channels for dynamic updates + const subscribedChannels = new Set(); // Start empty, add after successful subscription + const subscribedDMs = new Set(); + + /** + * Subscribe to a group channel + */ + async function subscribeToChannel(channelNest) { + if (subscribedChannels.has(channelNest)) { + return; // Already subscribed + } + + const parsed = parseChannelNest(channelNest); + if (!parsed) { + runtime.error?.( + `[tlon] Invalid channel format: ${channelNest} (expected: chat/~host-ship/channel-name)` + ); + return; + } + + try { + await api.subscribe({ + app: "channels", + path: `/${channelNest}`, + event: handleIncomingGroupMessage(channelNest), + err: (error) => { + runtime.error?.( + `[tlon] Group subscription error for ${channelNest}: ${error}` + ); + }, + quit: () => { + runtime.log?.(`[tlon] Group subscription ended for ${channelNest}`); + subscribedChannels.delete(channelNest); + }, + }); + subscribedChannels.add(channelNest); + runtime.log?.(`[tlon] Subscribed to group channel: ${channelNest}`); + } catch (error) { + runtime.error?.(`[tlon] Failed to subscribe to ${channelNest}: ${error.message}`); + } + } + + /** + * Subscribe to a DM conversation + */ + async function subscribeToDM(dmShip) { + if (subscribedDMs.has(dmShip)) { + return; // Already subscribed + } + + try { + await api.subscribe({ + app: "chat", + path: `/dm/${dmShip}`, + event: handleIncomingDM, + err: (error) => { + runtime.error?.(`[tlon] DM subscription error for ${dmShip}: ${error}`); + }, + quit: () => { + runtime.log?.(`[tlon] DM subscription ended for ${dmShip}`); + subscribedDMs.delete(dmShip); + }, + }); + subscribedDMs.add(dmShip); + runtime.log?.(`[tlon] Subscribed to DM with ${dmShip}`); + } catch (error) { + runtime.error?.(`[tlon] Failed to subscribe to DM with ${dmShip}: ${error.message}`); + } + } + + /** + * Discover and subscribe to new channels + */ + async function refreshChannelSubscriptions() { + try { + // Check for new DMs + const dmShips = await api.scry("/chat/dm.json"); + for (const dmShip of dmShips) { + await subscribeToDM(dmShip); + } + + // Check for new group channels (if auto-discovery is enabled) + if (account.autoDiscoverChannels !== false) { + const discoveredChannels = await fetchAllChannels(api, runtime); + + // Find truly new channels (not already subscribed) + const newChannels = discoveredChannels.filter(c => !subscribedChannels.has(c)); + + if (newChannels.length > 0) { + runtime.log?.(`[tlon] 🆕 Discovered ${newChannels.length} new channel(s):`); + newChannels.forEach(c => runtime.log?.(`[tlon] - ${c}`)); + } + + // Subscribe to all discovered channels (including new ones) + for (const channelNest of discoveredChannels) { + await subscribeToChannel(channelNest); + } + } + } catch (error) { + runtime.error?.(`[tlon] Channel refresh failed: ${error.message}`); + } + } + + // Subscribe to incoming messages + try { + runtime.log?.(`[tlon] Subscribing to updates...`); + + // Get list of DM ships and subscribe to each one + let dmShips = []; + try { + dmShips = await api.scry("/chat/dm.json"); + runtime.log?.(`[tlon] Found ${dmShips.length} DM conversation(s)`); + } catch (error) { + runtime.error?.(`[tlon] Failed to fetch DM list: ${error.message}`); + } + + // Subscribe to each DM individually + for (const dmShip of dmShips) { + await subscribeToDM(dmShip); + } + + // Subscribe to each group channel + for (const channelNest of groupChannels) { + await subscribeToChannel(channelNest); + } + + runtime.log?.(`[tlon] All subscriptions registered, connecting to SSE stream...`); + + // Connect to Urbit and start the SSE stream + await api.connect(); + + runtime.log?.(`[tlon] Connected! All subscriptions active`); + + // Start dynamic channel discovery (poll every 2 minutes) + const POLL_INTERVAL_MS = 2 * 60 * 1000; // 2 minutes + const pollInterval = setInterval(() => { + if (!opts.abortSignal?.aborted) { + runtime.log?.(`[tlon] Checking for new channels...`); + refreshChannelSubscriptions().catch((error) => { + runtime.error?.(`[tlon] Channel refresh error: ${error.message}`); + }); + } + }, POLL_INTERVAL_MS); + + runtime.log?.(`[tlon] Dynamic channel discovery enabled (checking every 2 minutes)`); + + // Keep the monitor running until aborted + if (opts.abortSignal) { + await new Promise((resolve) => { + opts.abortSignal.addEventListener("abort", () => { + clearInterval(pollInterval); + resolve(); + }, { + once: true, + }); + }); + } else { + // If no abort signal, wait indefinitely + await new Promise(() => {}); + } + } catch (error) { + if (opts.abortSignal?.aborted) { + runtime.log?.(`[tlon] Monitor stopped`); + return; + } + throw error; + } finally { + // Cleanup + try { + await api.close(); + } catch (e) { + runtime.error?.(`[tlon] Cleanup error: ${e.message}`); + } + } +} diff --git a/extensions/tlon/src/urbit-sse-client.js b/extensions/tlon/src/urbit-sse-client.js new file mode 100644 index 000000000..eb52c8573 --- /dev/null +++ b/extensions/tlon/src/urbit-sse-client.js @@ -0,0 +1,371 @@ +/** + * Custom SSE client for Urbit that works in Node.js + * Handles authentication cookies and streaming properly + */ + +import { Readable } from "stream"; + +export class UrbitSSEClient { + constructor(url, cookie, options = {}) { + this.url = url; + // Extract just the cookie value (first part before semicolon) + this.cookie = cookie.split(";")[0]; + this.channelId = `${Math.floor(Date.now() / 1000)}-${Math.random() + .toString(36) + .substring(2, 8)}`; + this.channelUrl = `${url}/~/channel/${this.channelId}`; + this.subscriptions = []; + this.eventHandlers = new Map(); + this.aborted = false; + this.streamController = null; + + // Reconnection settings + this.onReconnect = options.onReconnect || null; + this.autoReconnect = options.autoReconnect !== false; // Default true + this.reconnectAttempts = 0; + this.maxReconnectAttempts = options.maxReconnectAttempts || 10; + this.reconnectDelay = options.reconnectDelay || 1000; // Start at 1s + this.maxReconnectDelay = options.maxReconnectDelay || 30000; // Max 30s + this.isConnected = false; + } + + /** + * Subscribe to an Urbit path + */ + async subscribe({ app, path, event, err, quit }) { + const subId = this.subscriptions.length + 1; + + this.subscriptions.push({ + id: subId, + action: "subscribe", + ship: this.url.match(/\/\/([^.]+)/)[1].replace("~", ""), + app, + path, + }); + + // Store event handlers + this.eventHandlers.set(subId, { event, err, quit }); + + return subId; + } + + /** + * Create the channel and start listening for events + */ + async connect() { + // Create channel with all subscriptions + const createResp = await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify(this.subscriptions), + }); + + if (!createResp.ok && createResp.status !== 204) { + throw new Error(`Channel creation failed: ${createResp.status}`); + } + + // Send helm-hi poke to activate the channel + // This is required before opening the SSE stream + const pokeResp = await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify([ + { + id: Date.now(), + action: "poke", + ship: this.url.match(/\/\/([^.]+)/)[1].replace("~", ""), + app: "hood", + mark: "helm-hi", + json: "Opening API channel", + }, + ]), + }); + + if (!pokeResp.ok && pokeResp.status !== 204) { + throw new Error(`Channel activation failed: ${pokeResp.status}`); + } + + // Open SSE stream + await this.openStream(); + this.isConnected = true; + this.reconnectAttempts = 0; // Reset on successful connection + } + + /** + * Open the SSE stream and process events + */ + async openStream() { + const response = await fetch(this.channelUrl, { + method: "GET", + headers: { + Accept: "text/event-stream", + Cookie: this.cookie, + }, + }); + + if (!response.ok) { + throw new Error(`Stream connection failed: ${response.status}`); + } + + // Start processing the stream in the background (don't await) + this.processStream(response.body).catch((error) => { + if (!this.aborted) { + console.error("Stream error:", error); + // Notify all error handlers + for (const { err } of this.eventHandlers.values()) { + if (err) err(error); + } + } + }); + + // Stream is connected and running in background + // Return immediately so connect() can complete + } + + /** + * Process the SSE stream (runs in background) + */ + async processStream(body) { + const reader = body; + let buffer = ""; + + // Convert Web ReadableStream to Node Readable if needed + const stream = + reader instanceof ReadableStream ? Readable.fromWeb(reader) : reader; + + try { + for await (const chunk of stream) { + if (this.aborted) break; + + buffer += chunk.toString(); + + // Process complete SSE events + let eventEnd; + while ((eventEnd = buffer.indexOf("\n\n")) !== -1) { + const eventData = buffer.substring(0, eventEnd); + buffer = buffer.substring(eventEnd + 2); + + this.processEvent(eventData); + } + } + } finally { + // Stream ended (either normally or due to error) + if (!this.aborted && this.autoReconnect) { + this.isConnected = false; + console.log("[SSE] Stream ended, attempting reconnection..."); + await this.attemptReconnect(); + } + } + } + + /** + * Process a single SSE event + */ + processEvent(eventData) { + const lines = eventData.split("\n"); + let id = null; + let data = null; + + for (const line of lines) { + if (line.startsWith("id: ")) { + id = line.substring(4); + } else if (line.startsWith("data: ")) { + data = line.substring(6); + } + } + + if (!data) return; + + try { + const parsed = JSON.parse(data); + + // Handle quit events - subscription ended + if (parsed.response === "quit") { + console.log(`[SSE] Received quit event for subscription ${parsed.id}`); + const handlers = this.eventHandlers.get(parsed.id); + if (handlers && handlers.quit) { + handlers.quit(); + } + return; + } + + // Debug: Log received events (skip subscription confirmations) + if (parsed.response !== "subscribe" && parsed.response !== "poke") { + console.log("[SSE] Received event:", JSON.stringify(parsed).substring(0, 500)); + } + + // Route to appropriate handler based on subscription + if (parsed.id && this.eventHandlers.has(parsed.id)) { + const { event } = this.eventHandlers.get(parsed.id); + if (event && parsed.json) { + console.log(`[SSE] Calling handler for subscription ${parsed.id}`); + event(parsed.json); + } + } else if (parsed.json) { + // Try to match by response structure for events without specific ID + console.log(`[SSE] Broadcasting event to all handlers`); + for (const { event } of this.eventHandlers.values()) { + if (event) { + event(parsed.json); + } + } + } + } catch (error) { + console.error("Error parsing SSE event:", error); + } + } + + /** + * Send a poke to Urbit + */ + async poke({ app, mark, json }) { + const pokeId = Date.now(); + + const pokeData = { + id: pokeId, + action: "poke", + ship: this.url.match(/\/\/([^.]+)/)[1].replace("~", ""), + app, + mark, + json, + }; + + console.log(`[SSE] Sending poke to ${app}:`, JSON.stringify(pokeData).substring(0, 300)); + + const response = await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify([pokeData]), + }); + + console.log(`[SSE] Poke response status: ${response.status}`); + + if (!response.ok && response.status !== 204) { + const errorText = await response.text(); + console.log(`[SSE] Poke error body: ${errorText.substring(0, 500)}`); + throw new Error(`Poke failed: ${response.status} - ${errorText}`); + } + + return pokeId; + } + + /** + * Perform a scry (read-only query) to Urbit + */ + async scry(path) { + const scryUrl = `${this.url}/~/scry${path}`; + + const response = await fetch(scryUrl, { + method: "GET", + headers: { + Cookie: this.cookie, + }, + }); + + if (!response.ok) { + throw new Error(`Scry failed: ${response.status} for path ${path}`); + } + + return await response.json(); + } + + /** + * Attempt to reconnect with exponential backoff + */ + async attemptReconnect() { + if (this.aborted || !this.autoReconnect) { + console.log("[SSE] Reconnection aborted or disabled"); + return; + } + + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + console.error( + `[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Giving up.` + ); + return; + } + + this.reconnectAttempts++; + + // Calculate delay with exponential backoff + const delay = Math.min( + this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1), + this.maxReconnectDelay + ); + + console.log( + `[SSE] Reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms...` + ); + + await new Promise((resolve) => setTimeout(resolve, delay)); + + try { + // Generate new channel ID for reconnection + this.channelId = `${Math.floor(Date.now() / 1000)}-${Math.random() + .toString(36) + .substring(2, 8)}`; + this.channelUrl = `${this.url}/~/channel/${this.channelId}`; + + console.log(`[SSE] Reconnecting with new channel ID: ${this.channelId}`); + + // Call reconnect callback if provided + if (this.onReconnect) { + await this.onReconnect(this); + } + + // Reconnect + await this.connect(); + + console.log("[SSE] Reconnection successful!"); + } catch (error) { + console.error(`[SSE] Reconnection failed: ${error.message}`); + // Try again + await this.attemptReconnect(); + } + } + + /** + * Close the connection + */ + async close() { + this.aborted = true; + this.isConnected = false; + + try { + // Send unsubscribe for all subscriptions + const unsubscribes = this.subscriptions.map((sub) => ({ + id: sub.id, + action: "unsubscribe", + subscription: sub.id, + })); + + await fetch(this.channelUrl, { + method: "PUT", + headers: { + "Content-Type": "application/json", + Cookie: this.cookie, + }, + body: JSON.stringify(unsubscribes), + }); + + // Delete the channel + await fetch(this.channelUrl, { + method: "DELETE", + headers: { + Cookie: this.cookie, + }, + }); + } catch (error) { + console.error("Error closing channel:", error); + } + } +}