fix: preserve restart routing + thread replies (#1337) (thanks @John-Rood)
Co-authored-by: John-Rood <John-Rood@users.noreply.github.com> Co-authored-by: Outdoor <outdoor@users.noreply.github.com>
This commit is contained in:
@@ -29,6 +29,7 @@ Docs: https://docs.clawd.bot
|
|||||||
- Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk.
|
- Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk.
|
||||||
- Doctor: clarify plugin auto-enable hint text in the startup banner.
|
- Doctor: clarify plugin auto-enable hint text in the startup banner.
|
||||||
- Gateway: clarify unauthorized handshake responses with token/password mismatch guidance.
|
- Gateway: clarify unauthorized handshake responses with token/password mismatch guidance.
|
||||||
|
- Gateway: preserve restart wake routing + thread replies across restarts. (#1337) — thanks @John-Rood.
|
||||||
- Gateway: reschedule per-agent heartbeats on config hot reload without restarting the runner.
|
- Gateway: reschedule per-agent heartbeats on config hot reload without restarting the runner.
|
||||||
- UI: keep config form enums typed, preserve empty strings, protect sensitive defaults, and deepen config search. (#1315) — thanks @MaudeBot.
|
- UI: keep config form enums typed, preserve empty strings, protect sensitive defaults, and deepen config search. (#1315) — thanks @MaudeBot.
|
||||||
- UI: preserve ordered list numbering in chat markdown. (#1341) — thanks @bradleypriest.
|
- UI: preserve ordered list numbering in chat markdown. (#1341) — thanks @bradleypriest.
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ import crypto from "node:crypto";
|
|||||||
import { Type } from "@sinclair/typebox";
|
import { Type } from "@sinclair/typebox";
|
||||||
|
|
||||||
import type { ClawdbotConfig } from "../../config/config.js";
|
import type { ClawdbotConfig } from "../../config/config.js";
|
||||||
|
import { loadConfig } from "../../config/io.js";
|
||||||
|
import { loadSessionStore, resolveStorePath } from "../../config/sessions.js";
|
||||||
import { scheduleGatewaySigusr1Restart } from "../../infra/restart.js";
|
import { scheduleGatewaySigusr1Restart } from "../../infra/restart.js";
|
||||||
import {
|
import {
|
||||||
formatDoctorNonInteractiveHint,
|
formatDoctorNonInteractiveHint,
|
||||||
@@ -77,11 +79,42 @@ export function createGatewayTool(opts?: {
|
|||||||
: undefined;
|
: undefined;
|
||||||
const note =
|
const note =
|
||||||
typeof params.note === "string" && params.note.trim() ? params.note.trim() : undefined;
|
typeof params.note === "string" && params.note.trim() ? params.note.trim() : undefined;
|
||||||
|
// Extract channel + threadId for routing after restart
|
||||||
|
let deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined;
|
||||||
|
let threadId: string | undefined;
|
||||||
|
if (sessionKey) {
|
||||||
|
const threadMarker = ":thread:";
|
||||||
|
const threadIndex = sessionKey.lastIndexOf(threadMarker);
|
||||||
|
const baseSessionKey = threadIndex === -1 ? sessionKey : sessionKey.slice(0, threadIndex);
|
||||||
|
const threadIdRaw =
|
||||||
|
threadIndex === -1 ? undefined : sessionKey.slice(threadIndex + threadMarker.length);
|
||||||
|
threadId = threadIdRaw?.trim() || undefined;
|
||||||
|
try {
|
||||||
|
const cfg = loadConfig();
|
||||||
|
const storePath = resolveStorePath(cfg.session?.store);
|
||||||
|
const store = loadSessionStore(storePath);
|
||||||
|
let entry = store[sessionKey];
|
||||||
|
if (!entry?.deliveryContext && threadIndex !== -1 && baseSessionKey) {
|
||||||
|
entry = store[baseSessionKey];
|
||||||
|
}
|
||||||
|
if (entry?.deliveryContext) {
|
||||||
|
deliveryContext = {
|
||||||
|
channel: entry.deliveryContext.channel,
|
||||||
|
to: entry.deliveryContext.to,
|
||||||
|
accountId: entry.deliveryContext.accountId,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// ignore: best-effort
|
||||||
|
}
|
||||||
|
}
|
||||||
const payload: RestartSentinelPayload = {
|
const payload: RestartSentinelPayload = {
|
||||||
kind: "restart",
|
kind: "restart",
|
||||||
status: "ok",
|
status: "ok",
|
||||||
ts: Date.now(),
|
ts: Date.now(),
|
||||||
sessionKey,
|
sessionKey,
|
||||||
|
deliveryContext,
|
||||||
|
threadId,
|
||||||
message: note ?? reason ?? null,
|
message: note ?? reason ?? null,
|
||||||
doctorHint: formatDoctorNonInteractiveHint(),
|
doctorHint: formatDoctorNonInteractiveHint(),
|
||||||
stats: {
|
stats: {
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import type { ChannelOutboundTargetMode } from "../../channels/plugins/types.js";
|
|
||||||
import type { ClientToolDefinition } from "../../agents/pi-embedded-runner/run/params.js";
|
import type { ClientToolDefinition } from "../../agents/pi-embedded-runner/run/params.js";
|
||||||
|
import type { ChannelOutboundTargetMode } from "../../channels/plugins/types.js";
|
||||||
|
|
||||||
/** Image content block for Claude API multimodal messages. */
|
/** Image content block for Claude API multimodal messages. */
|
||||||
export type ImageContent = {
|
export type ImageContent = {
|
||||||
|
|||||||
@@ -28,9 +28,30 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const threadMarker = ":thread:";
|
||||||
|
const threadIndex = sessionKey.lastIndexOf(threadMarker);
|
||||||
|
const baseSessionKey = threadIndex === -1 ? sessionKey : sessionKey.slice(0, threadIndex);
|
||||||
|
const threadIdRaw =
|
||||||
|
threadIndex === -1 ? undefined : sessionKey.slice(threadIndex + threadMarker.length);
|
||||||
|
const sessionThreadId = threadIdRaw?.trim() || undefined;
|
||||||
|
|
||||||
const { cfg, entry } = loadSessionEntry(sessionKey);
|
const { cfg, entry } = loadSessionEntry(sessionKey);
|
||||||
const parsedTarget = resolveAnnounceTargetFromKey(sessionKey);
|
const parsedTarget = resolveAnnounceTargetFromKey(baseSessionKey);
|
||||||
const origin = mergeDeliveryContext(deliveryContextFromSession(entry), parsedTarget ?? undefined);
|
|
||||||
|
// Prefer delivery context from sentinel (captured at restart) over session store
|
||||||
|
// Handles race condition where store wasn't flushed before restart
|
||||||
|
const sentinelContext = payload.deliveryContext;
|
||||||
|
let sessionDeliveryContext = deliveryContextFromSession(entry);
|
||||||
|
if (!sessionDeliveryContext && threadIndex !== -1 && baseSessionKey) {
|
||||||
|
const { entry: baseEntry } = loadSessionEntry(baseSessionKey);
|
||||||
|
sessionDeliveryContext = deliveryContextFromSession(baseEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
const origin = mergeDeliveryContext(
|
||||||
|
sentinelContext,
|
||||||
|
mergeDeliveryContext(sessionDeliveryContext, parsedTarget ?? undefined),
|
||||||
|
);
|
||||||
|
|
||||||
const channelRaw = origin?.channel;
|
const channelRaw = origin?.channel;
|
||||||
const channel = channelRaw ? normalizeChannelId(channelRaw) : null;
|
const channel = channelRaw ? normalizeChannelId(channelRaw) : null;
|
||||||
const to = origin?.to;
|
const to = origin?.to;
|
||||||
@@ -51,6 +72,11 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const threadId =
|
||||||
|
payload.threadId ??
|
||||||
|
sessionThreadId ??
|
||||||
|
(origin?.threadId != null ? String(origin.threadId) : undefined);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await agentCommand(
|
await agentCommand(
|
||||||
{
|
{
|
||||||
@@ -61,6 +87,7 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
|
|||||||
deliver: true,
|
deliver: true,
|
||||||
bestEffortDeliver: true,
|
bestEffortDeliver: true,
|
||||||
messageChannel: channel,
|
messageChannel: channel,
|
||||||
|
replyToId: threadId,
|
||||||
},
|
},
|
||||||
defaultRuntime,
|
defaultRuntime,
|
||||||
params.deps,
|
params.deps,
|
||||||
|
|||||||
@@ -33,6 +33,14 @@ export type RestartSentinelPayload = {
|
|||||||
status: "ok" | "error" | "skipped";
|
status: "ok" | "error" | "skipped";
|
||||||
ts: number;
|
ts: number;
|
||||||
sessionKey?: string;
|
sessionKey?: string;
|
||||||
|
/** Delivery context captured at restart time to ensure channel routing survives restart. */
|
||||||
|
deliveryContext?: {
|
||||||
|
channel?: string;
|
||||||
|
to?: string;
|
||||||
|
accountId?: string;
|
||||||
|
};
|
||||||
|
/** Thread ID for reply threading (e.g., Slack thread_ts). */
|
||||||
|
threadId?: string;
|
||||||
message?: string | null;
|
message?: string | null;
|
||||||
doctorHint?: string | null;
|
doctorHint?: string | null;
|
||||||
stats?: RestartSentinelStats | null;
|
stats?: RestartSentinelStats | null;
|
||||||
|
|||||||
Reference in New Issue
Block a user