diff --git a/src/discord/monitor.slash.test.ts b/src/discord/monitor.slash.test.ts new file mode 100644 index 000000000..a2a8beba3 --- /dev/null +++ b/src/discord/monitor.slash.test.ts @@ -0,0 +1,83 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const dispatchMock = vi.fn(); + +vi.mock("@buape/carbon", () => ({ + ChannelType: { DM: "dm", GroupDM: "group" }, + MessageType: { + ChatInputCommand: 1, + ContextMenuCommand: 2, + Default: 0, + }, + Command: class {}, + Client: class {}, + MessageCreateListener: class {}, + MessageReactionAddListener: class {}, + MessageReactionRemoveListener: class {}, +})); + +vi.mock("../auto-reply/reply/dispatch-from-config.js", () => ({ + dispatchReplyFromConfig: (...args: unknown[]) => dispatchMock(...args), +})); + +beforeEach(() => { + dispatchMock.mockReset().mockImplementation(async ({ dispatcher }) => { + dispatcher.sendToolResult({ text: "tool update" }); + dispatcher.sendFinalReply({ text: "final reply" }); + return { queuedFinal: true, counts: { tool: 1, block: 0, final: 1 } }; + }); +}); + +describe("discord native commands", () => { + it( + "streams tool results for native slash commands", + { timeout: 10_000 }, + async () => { + const { ChannelType } = await import("@buape/carbon"); + const { createDiscordNativeCommand } = await import("./monitor.js"); + + const cfg = { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: "/tmp/clawd", + }, + }, + session: { store: "/tmp/clawdbot-sessions.json" }, + discord: { dm: { enabled: true, policy: "open" } }, + } as ReturnType; + + const command = createDiscordNativeCommand({ + command: { + name: "verbose", + description: "Toggle verbose mode.", + acceptsArgs: true, + }, + cfg, + discordConfig: cfg.discord, + accountId: "default", + sessionPrefix: "discord:slash", + ephemeralDefault: true, + }); + + const reply = vi.fn().mockResolvedValue(undefined); + const followUp = vi.fn().mockResolvedValue(undefined); + + await command.run({ + user: { id: "u1", username: "Ada", globalName: "Ada" }, + channel: { type: ChannelType.DM }, + guild: null, + rawData: { id: "i1" }, + options: { getString: vi.fn().mockReturnValue("on") }, + reply, + followUp, + }); + + expect(dispatchMock).toHaveBeenCalledTimes(1); + expect(reply).toHaveBeenCalledTimes(1); + expect(followUp).toHaveBeenCalledTimes(1); + expect(reply.mock.calls[0]?.[0]?.content).toContain("tool"); + expect(followUp.mock.calls[0]?.[0]?.content).toContain("final"); + }, + ); +}); diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 51573e191..2a4e2b821 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -51,7 +51,6 @@ import { createReplyDispatcherWithTyping, } from "../auto-reply/reply/reply-dispatcher.js"; import { createReplyReferencePlanner } from "../auto-reply/reply/reply-reference.js"; -import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { isNativeCommandsExplicitlyDisabled, @@ -1603,7 +1602,7 @@ async function handleDiscordReactionEvent(params: { } } -function createDiscordNativeCommand(params: { +export function createDiscordNativeCommand(params: { command: { name: string; description: string; @@ -1837,7 +1836,7 @@ function createDiscordNativeCommand(params: { responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) .responsePrefix, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), - deliver: async (payload, _info) => { + deliver: async (payload) => { await deliverDiscordInteractionReply({ interaction, payload, @@ -1849,24 +1848,23 @@ function createDiscordNativeCommand(params: { }); didReply = true; }, - onError: (err) => { - console.error(err); + onError: (err, info) => { + console.error(`discord slash ${info.kind} reply failed`, err); }, }); - const replyResult = await getReplyFromConfig( - ctxPayload, - { skillFilter: channelConfig?.skills }, + await dispatchReplyFromConfig({ + ctx: ctxPayload, cfg, - ); - const replies = replyResult - ? Array.isArray(replyResult) - ? replyResult - : [replyResult] - : []; - for (const reply of replies) { - dispatcher.sendFinalReply(reply); - } + dispatcher, + replyOptions: { + skillFilter: channelConfig?.skills, + disableBlockStreaming: + typeof discordConfig?.blockStreaming === "boolean" + ? !discordConfig.blockStreaming + : undefined, + }, + }); await dispatcher.waitForIdle(); } })(); diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index 7249d7ab9..e4e5c4834 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -34,9 +34,11 @@ import { buildMentionRegexes, matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; -import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; +import { + createReplyDispatcher, + createReplyDispatcherWithTyping, +} from "../auto-reply/reply/reply-dispatcher.js"; import { createReplyReferencePlanner } from "../auto-reply/reply/reply-reference.js"; -import { getReplyFromConfig } from "../auto-reply/reply.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { resolveNativeCommandsEnabled } from "../config/commands.js"; @@ -1921,23 +1923,39 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { OriginatingTo: `user:${command.user_id}`, }; - const replyResult = await getReplyFromConfig( - ctxPayload, - { skillFilter: channelConfig?.skills }, - cfg, - ); - const replies = replyResult - ? Array.isArray(replyResult) - ? replyResult - : [replyResult] - : []; - - await deliverSlackSlashReplies({ - replies, - respond, - ephemeral: slashCommand.ephemeral, - textLimit, + const dispatcher = createReplyDispatcher({ + responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) + .responsePrefix, + deliver: async (payload) => { + await deliverSlackSlashReplies({ + replies: [payload], + respond, + ephemeral: slashCommand.ephemeral, + textLimit, + }); + }, + onError: (err, info) => { + runtime.error?.( + danger(`slack slash ${info.kind} reply failed: ${String(err)}`), + ); + }, }); + + const { counts } = await dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { skillFilter: channelConfig?.skills }, + }); + await dispatcher.waitForIdle(); + if (counts.final + counts.tool + counts.block === 0) { + await deliverSlackSlashReplies({ + replies: [], + respond, + ephemeral: slashCommand.ephemeral, + textLimit, + }); + } } catch (err) { runtime.error?.(danger(`slack slash handler failed: ${String(err)}`)); await respond({ diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index 991075ec6..65caf1ebf 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -1871,6 +1871,49 @@ describe("createTelegramBot", () => { ); }); + it("streams tool summaries for native slash commands", async () => { + onSpy.mockReset(); + sendMessageSpy.mockReset(); + commandSpy.mockReset(); + const replySpy = replyModule.__replySpy as unknown as ReturnType< + typeof vi.fn + >; + replySpy.mockReset(); + replySpy.mockImplementation(async (_ctx, opts) => { + await opts?.onToolResult?.({ text: "tool update" }); + return { text: "final reply" }; + }); + + loadConfig.mockReturnValue({ + commands: { native: true }, + telegram: { + dmPolicy: "open", + allowFrom: ["*"], + }, + }); + + createTelegramBot({ token: "tok" }); + const verboseHandler = commandSpy.mock.calls.find( + (call) => call[0] === "verbose", + )?.[1] as ((ctx: Record) => Promise) | undefined; + if (!verboseHandler) throw new Error("verbose command handler missing"); + + await verboseHandler({ + message: { + chat: { id: 12345, type: "private" }, + from: { id: 12345, username: "testuser" }, + text: "/verbose on", + date: 1736380800, + message_id: 42, + }, + match: "on", + }); + + expect(sendMessageSpy).toHaveBeenCalledTimes(2); + expect(sendMessageSpy.mock.calls[0]?.[1]).toContain("tool update"); + expect(sendMessageSpy.mock.calls[1]?.[1]).toContain("final reply"); + }); + it("dedupes duplicate message updates by update_id", async () => { onSpy.mockReset(); const replySpy = replyModule.__replySpy as unknown as ReturnType< diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 98f3cc911..ea2f09ddf 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -31,7 +31,6 @@ import { matchesMentionPatterns, } from "../auto-reply/reply/mentions.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; -import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { isNativeCommandsExplicitlyDisabled, @@ -1128,25 +1127,41 @@ export function createTelegramBot(opts: TelegramBotOptions) { IsForum: isForum, }; - const replyResult = await getReplyFromConfig( - ctxPayload, - { skillFilter }, + const disableBlockStreaming = + typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : undefined; + + await dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, cfg, - ); - const replies = replyResult - ? Array.isArray(replyResult) - ? replyResult - : [replyResult] - : []; - await deliverReplies({ - replies, - chatId: String(chatId), - token: opts.token, - runtime, - bot, - replyToMode, - textLimit, - messageThreadId, + dispatcherOptions: { + responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId) + .responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + chatId: String(chatId), + token: opts.token, + runtime, + bot, + replyToMode, + textLimit, + messageThreadId, + }); + }, + onError: (err, info) => { + runtime.error?.( + danger( + `telegram slash ${info.kind} reply failed: ${String(err)}`, + ), + ); + }, + }, + replyOptions: { + skillFilter, + disableBlockStreaming, + }, }); }); }