refactor: centralize slack threading helpers

This commit is contained in:
Peter Steinberger
2026-01-09 16:01:47 +00:00
parent d0b06b4334
commit d099dabf37
8 changed files with 141 additions and 63 deletions

View File

@@ -50,7 +50,7 @@ import {
shouldSuppressMessagingToolReplies,
} from "./reply-payloads.js";
import {
createReplyToModeFilter,
createReplyToModeFilterForChannel,
resolveReplyToMode,
} from "./reply-threading.js";
import { incrementCompactionCount } from "./session-updates.js";
@@ -260,9 +260,10 @@ export async function runReplyAgent(params: {
followupRun.run.config,
replyToChannel,
);
const applyReplyToMode = createReplyToModeFilter(replyToMode, {
allowTagsWhenOff: replyToChannel === "slack",
});
const applyReplyToMode = createReplyToModeFilterForChannel(
replyToMode,
replyToChannel,
);
const cfg = followupRun.run.config;
if (shouldSteer && isStreaming) {
@@ -718,7 +719,8 @@ export async function runReplyAgent(params: {
const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({
payloads: sanitizedPayloads,
applyReplyToMode,
replyToMode,
replyToChannel,
currentMessageId: sessionCtx.MessageSid,
})
.map((payload) => {

View File

@@ -19,10 +19,7 @@ import {
filterMessagingToolDuplicates,
shouldSuppressMessagingToolReplies,
} from "./reply-payloads.js";
import {
createReplyToModeFilter,
resolveReplyToMode,
} from "./reply-threading.js";
import { resolveReplyToMode } from "./reply-threading.js";
import { isRoutableChannel, routeReply } from "./route-reply.js";
import { incrementCompactionCount } from "./session-updates.js";
import type { TypingController } from "./typing.js";
@@ -195,14 +192,12 @@ export function createFollowupRunner(params: {
(queued.run.messageProvider?.toLowerCase() as
| OriginatingChannelType
| undefined);
const applyReplyToMode = createReplyToModeFilter(
resolveReplyToMode(queued.run.config, replyToChannel),
{ allowTagsWhenOff: replyToChannel === "slack" },
);
const replyToMode = resolveReplyToMode(queued.run.config, replyToChannel);
const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({
payloads: sanitizedPayloads,
applyReplyToMode,
replyToMode,
replyToChannel,
});
const dedupedPayloads = filterMessagingToolDuplicates({

View File

@@ -1,9 +1,10 @@
import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js";
import type { MessagingToolSend } from "../../agents/pi-embedded-runner.js";
import type { ReplyToMode } from "../../config/types.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import { extractReplyToTag } from "./reply-tags.js";
export type ReplyToModeFilter = (payload: ReplyPayload) => ReplyPayload;
import { createReplyToModeFilterForChannel } from "./reply-threading.js";
export function applyReplyTagsToPayload(
payload: ReplyPayload,
@@ -32,10 +33,15 @@ export function isRenderablePayload(payload: ReplyPayload): boolean {
export function applyReplyThreading(params: {
payloads: ReplyPayload[];
applyReplyToMode: ReplyToModeFilter;
replyToMode: ReplyToMode;
replyToChannel?: OriginatingChannelType;
currentMessageId?: string;
}): ReplyPayload[] {
const { payloads, applyReplyToMode, currentMessageId } = params;
const { payloads, replyToMode, replyToChannel, currentMessageId } = params;
const applyReplyToMode = createReplyToModeFilterForChannel(
replyToMode,
replyToChannel,
);
return payloads
.map((payload) => applyReplyTagsToPayload(payload, currentMessageId))
.filter(isRenderablePayload)

View File

@@ -38,3 +38,12 @@ export function createReplyToModeFilter(
return payload;
};
}
export function createReplyToModeFilterForChannel(
mode: ReplyToMode,
channel?: OriginatingChannelType,
) {
return createReplyToModeFilter(mode, {
allowTagsWhenOff: channel === "slack",
});
}

View File

@@ -58,7 +58,13 @@ import type { RuntimeEnv } from "../runtime.js";
import { resolveSlackAccount } from "./accounts.js";
import { reactSlackMessage } from "./actions.js";
import { sendMessageSlack } from "./send.js";
import { resolveSlackThreadTargets } from "./threading.js";
import { resolveSlackAppToken, resolveSlackBotToken } from "./token.js";
import type {
SlackAppMentionEvent,
SlackFile,
SlackMessageEvent,
} from "./types.js";
export type MonitorSlackOpts = {
botToken?: string;
@@ -71,45 +77,6 @@ export type MonitorSlackOpts = {
slashCommand?: SlackSlashCommandConfig;
};
type SlackFile = {
id?: string;
name?: string;
mimetype?: string;
size?: number;
url_private?: string;
url_private_download?: string;
};
type SlackMessageEvent = {
type: "message";
user?: string;
bot_id?: string;
subtype?: string;
username?: string;
text?: string;
ts?: string;
thread_ts?: string;
event_ts?: string;
parent_user_id?: string;
channel: string;
channel_type?: "im" | "mpim" | "channel" | "group";
files?: SlackFile[];
};
type SlackAppMentionEvent = {
type: "app_mention";
user?: string;
bot_id?: string;
username?: string;
text?: string;
ts?: string;
thread_ts?: string;
event_ts?: string;
parent_user_id?: string;
channel: string;
channel_type?: "im" | "mpim" | "channel" | "group";
};
type SlackReactionEvent = {
type: "reaction_added" | "reaction_removed";
user?: string;
@@ -1102,12 +1069,10 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
);
}
const incomingThreadTs = message.thread_ts;
const eventTs = message.event_ts;
const messageTs = message.ts ?? eventTs;
const replyThreadTs =
incomingThreadTs ?? (replyToMode === "all" ? messageTs : undefined);
const statusThreadTs = replyThreadTs ?? messageTs;
const { replyThreadTs, statusThreadTs } = resolveSlackThreadTargets({
message,
replyToMode,
});
let didSetStatus = false;
const onReplyStart = async () => {
didSetStatus = true;

View File

@@ -0,0 +1,48 @@
import { describe, expect, it } from "vitest";
import { resolveSlackThreadTargets } from "./threading.js";
describe("resolveSlackThreadTargets", () => {
it("threads replies when message is already threaded", () => {
const { replyThreadTs, statusThreadTs } = resolveSlackThreadTargets({
replyToMode: "off",
message: {
type: "message",
channel: "C1",
ts: "123",
thread_ts: "456",
},
});
expect(replyThreadTs).toBe("456");
expect(statusThreadTs).toBe("456");
});
it("threads top-level replies when mode is all", () => {
const { replyThreadTs, statusThreadTs } = resolveSlackThreadTargets({
replyToMode: "all",
message: {
type: "message",
channel: "C1",
ts: "123",
},
});
expect(replyThreadTs).toBe("123");
expect(statusThreadTs).toBe("123");
});
it("keeps status threading even when reply threading is off", () => {
const { replyThreadTs, statusThreadTs } = resolveSlackThreadTargets({
replyToMode: "off",
message: {
type: "message",
channel: "C1",
ts: "123",
},
});
expect(replyThreadTs).toBeUndefined();
expect(statusThreadTs).toBe("123");
});
});

15
src/slack/threading.ts Normal file
View File

@@ -0,0 +1,15 @@
import type { ReplyToMode } from "../config/types.js";
import type { SlackAppMentionEvent, SlackMessageEvent } from "./types.js";
export function resolveSlackThreadTargets(params: {
message: SlackMessageEvent | SlackAppMentionEvent;
replyToMode: ReplyToMode;
}) {
const incomingThreadTs = params.message.thread_ts;
const eventTs = params.message.event_ts;
const messageTs = params.message.ts ?? eventTs;
const replyThreadTs =
incomingThreadTs ?? (params.replyToMode === "all" ? messageTs : undefined);
const statusThreadTs = replyThreadTs ?? messageTs;
return { replyThreadTs, statusThreadTs };
}

38
src/slack/types.ts Normal file
View File

@@ -0,0 +1,38 @@
export type SlackFile = {
id?: string;
name?: string;
mimetype?: string;
size?: number;
url_private?: string;
url_private_download?: string;
};
export type SlackMessageEvent = {
type: "message";
user?: string;
bot_id?: string;
subtype?: string;
username?: string;
text?: string;
ts?: string;
thread_ts?: string;
event_ts?: string;
parent_user_id?: string;
channel: string;
channel_type?: "im" | "mpim" | "channel" | "group";
files?: SlackFile[];
};
export type SlackAppMentionEvent = {
type: "app_mention";
user?: string;
bot_id?: string;
username?: string;
text?: string;
ts?: string;
thread_ts?: string;
event_ts?: string;
parent_user_id?: string;
channel: string;
channel_type?: "im" | "mpim" | "channel" | "group";
};