feat: enhance message context with full ID support for replies and caching
- Updated message processing to include full message IDs alongside short IDs for better context resolution. - Improved reply handling by caching inbound messages, allowing for accurate sender and body resolution without exposing dropped content. - Adjusted tests to validate the new full ID properties and their integration into the message handling workflow.
This commit is contained in:
@@ -66,7 +66,7 @@ export const bluebubblesPlugin: ChannelPlugin<ResolvedBlueBubblesAccount> = {
|
|||||||
threading: {
|
threading: {
|
||||||
buildToolContext: ({ context, hasRepliedRef }) => ({
|
buildToolContext: ({ context, hasRepliedRef }) => ({
|
||||||
currentChannelId: context.To?.trim() || undefined,
|
currentChannelId: context.To?.trim() || undefined,
|
||||||
currentThreadTs: context.ReplyToId,
|
currentThreadTs: context.ReplyToIdFull ?? context.ReplyToId,
|
||||||
hasRepliedRef,
|
hasRepliedRef,
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1243,6 +1243,7 @@ describe("BlueBubbles webhook monitor", () => {
|
|||||||
const callArgs = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[0][0];
|
const callArgs = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[0][0];
|
||||||
// ReplyToId uses short ID "1" (first cached message) for token savings
|
// ReplyToId uses short ID "1" (first cached message) for token savings
|
||||||
expect(callArgs.ctx.ReplyToId).toBe("1");
|
expect(callArgs.ctx.ReplyToId).toBe("1");
|
||||||
|
expect(callArgs.ctx.ReplyToIdFull).toBe("cache-msg-0");
|
||||||
expect(callArgs.ctx.ReplyToBody).toBe("original message (cached)");
|
expect(callArgs.ctx.ReplyToBody).toBe("original message (cached)");
|
||||||
expect(callArgs.ctx.ReplyToSender).toBe("+15550000000");
|
expect(callArgs.ctx.ReplyToSender).toBe("+15550000000");
|
||||||
// Body uses just the short ID (no sender) for token savings
|
// Body uses just the short ID (no sender) for token savings
|
||||||
@@ -1812,6 +1813,7 @@ describe("BlueBubbles webhook monitor", () => {
|
|||||||
const callArgs = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[0][0];
|
const callArgs = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[0][0];
|
||||||
// MessageSid should be short ID "1" instead of full UUID
|
// MessageSid should be short ID "1" instead of full UUID
|
||||||
expect(callArgs.ctx.MessageSid).toBe("1");
|
expect(callArgs.ctx.MessageSid).toBe("1");
|
||||||
|
expect(callArgs.ctx.MessageSidFull).toBe("msg-uuid-12345");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("resolves short ID back to UUID", async () => {
|
it("resolves short ID back to UUID", async () => {
|
||||||
|
|||||||
@@ -1110,11 +1110,10 @@ async function processMessage(
|
|||||||
const placeholder = buildMessagePlaceholder(message);
|
const placeholder = buildMessagePlaceholder(message);
|
||||||
const rawBody = text || placeholder;
|
const rawBody = text || placeholder;
|
||||||
|
|
||||||
// Cache messages (including fromMe) so later replies can resolve sender/body even when
|
|
||||||
// BlueBubbles webhook payloads omit nested reply metadata.
|
|
||||||
const cacheMessageId = message.messageId?.trim();
|
const cacheMessageId = message.messageId?.trim();
|
||||||
let messageShortId: string | undefined;
|
let messageShortId: string | undefined;
|
||||||
if (cacheMessageId) {
|
const cacheInboundMessage = () => {
|
||||||
|
if (!cacheMessageId) return;
|
||||||
const cacheEntry = rememberBlueBubblesReplyCache({
|
const cacheEntry = rememberBlueBubblesReplyCache({
|
||||||
accountId: account.accountId,
|
accountId: account.accountId,
|
||||||
messageId: cacheMessageId,
|
messageId: cacheMessageId,
|
||||||
@@ -1126,9 +1125,13 @@ async function processMessage(
|
|||||||
timestamp: message.timestamp ?? Date.now(),
|
timestamp: message.timestamp ?? Date.now(),
|
||||||
});
|
});
|
||||||
messageShortId = cacheEntry.shortId;
|
messageShortId = cacheEntry.shortId;
|
||||||
}
|
};
|
||||||
|
|
||||||
if (message.fromMe) return;
|
if (message.fromMe) {
|
||||||
|
// Cache from-me messages so reply context can resolve sender/body.
|
||||||
|
cacheInboundMessage();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!rawBody) {
|
if (!rawBody) {
|
||||||
logVerbose(core, runtime, `drop: empty text sender=${message.senderId}`);
|
logVerbose(core, runtime, `drop: empty text sender=${message.senderId}`);
|
||||||
@@ -1370,6 +1373,10 @@ async function processMessage(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cache allowed inbound messages so later replies can resolve sender/body without
|
||||||
|
// surfacing dropped content (allowlist/mention/command gating).
|
||||||
|
cacheInboundMessage();
|
||||||
|
|
||||||
const baseUrl = account.config.serverUrl?.trim();
|
const baseUrl = account.config.serverUrl?.trim();
|
||||||
const password = account.config.password?.trim();
|
const password = account.config.password?.trim();
|
||||||
const maxBytes =
|
const maxBytes =
|
||||||
@@ -1610,6 +1617,7 @@ async function processMessage(
|
|||||||
ConversationLabel: fromLabel,
|
ConversationLabel: fromLabel,
|
||||||
// Use short ID for token savings (agent can use this to reference the message)
|
// Use short ID for token savings (agent can use this to reference the message)
|
||||||
ReplyToId: replyToShortId || replyToId,
|
ReplyToId: replyToShortId || replyToId,
|
||||||
|
ReplyToIdFull: replyToId,
|
||||||
ReplyToBody: replyToBody,
|
ReplyToBody: replyToBody,
|
||||||
ReplyToSender: replyToSender,
|
ReplyToSender: replyToSender,
|
||||||
GroupSubject: groupSubject,
|
GroupSubject: groupSubject,
|
||||||
@@ -1620,6 +1628,7 @@ async function processMessage(
|
|||||||
Surface: "bluebubbles",
|
Surface: "bluebubbles",
|
||||||
// Use short ID for token savings (agent can use this to reference the message)
|
// Use short ID for token savings (agent can use this to reference the message)
|
||||||
MessageSid: messageShortId || message.messageId,
|
MessageSid: messageShortId || message.messageId,
|
||||||
|
MessageSidFull: message.messageId,
|
||||||
Timestamp: message.timestamp,
|
Timestamp: message.timestamp,
|
||||||
OriginatingChannel: "bluebubbles",
|
OriginatingChannel: "bluebubbles",
|
||||||
OriginatingTo: `bluebubbles:${outboundTarget}`,
|
OriginatingTo: `bluebubbles:${outboundTarget}`,
|
||||||
@@ -1634,6 +1643,11 @@ async function processMessage(
|
|||||||
cfg: config,
|
cfg: config,
|
||||||
dispatcherOptions: {
|
dispatcherOptions: {
|
||||||
deliver: async (payload) => {
|
deliver: async (payload) => {
|
||||||
|
const rawReplyToId = typeof payload.replyToId === "string" ? payload.replyToId.trim() : "";
|
||||||
|
// Resolve short ID (e.g., "5") to full UUID
|
||||||
|
const replyToMessageGuid = rawReplyToId
|
||||||
|
? resolveBlueBubblesMessageId(rawReplyToId)
|
||||||
|
: "";
|
||||||
const mediaList = payload.mediaUrls?.length
|
const mediaList = payload.mediaUrls?.length
|
||||||
? payload.mediaUrls
|
? payload.mediaUrls
|
||||||
: payload.mediaUrl
|
: payload.mediaUrl
|
||||||
@@ -1649,7 +1663,7 @@ async function processMessage(
|
|||||||
to: outboundTarget,
|
to: outboundTarget,
|
||||||
mediaUrl,
|
mediaUrl,
|
||||||
caption: caption ?? undefined,
|
caption: caption ?? undefined,
|
||||||
replyToId: payload.replyToId ?? null,
|
replyToId: replyToMessageGuid || null,
|
||||||
accountId: account.accountId,
|
accountId: account.accountId,
|
||||||
});
|
});
|
||||||
const cachedBody = (caption ?? "").trim() || "<media:attachment>";
|
const cachedBody = (caption ?? "").trim() || "<media:attachment>";
|
||||||
@@ -1668,12 +1682,6 @@ async function processMessage(
|
|||||||
if (!chunks.length && payload.text) chunks.push(payload.text);
|
if (!chunks.length && payload.text) chunks.push(payload.text);
|
||||||
if (!chunks.length) return;
|
if (!chunks.length) return;
|
||||||
for (const chunk of chunks) {
|
for (const chunk of chunks) {
|
||||||
const rawReplyToId =
|
|
||||||
typeof payload.replyToId === "string" ? payload.replyToId.trim() : "";
|
|
||||||
// Resolve short ID (e.g., "5") to full UUID
|
|
||||||
const replyToMessageGuid = rawReplyToId
|
|
||||||
? resolveBlueBubblesMessageId(rawReplyToId)
|
|
||||||
: "";
|
|
||||||
const result = await sendMessageBlueBubbles(outboundTarget, chunk, {
|
const result = await sendMessageBlueBubbles(outboundTarget, chunk, {
|
||||||
cfg: config,
|
cfg: config,
|
||||||
accountId: account.accountId,
|
accountId: account.accountId,
|
||||||
|
|||||||
@@ -299,6 +299,8 @@ export async function runAgentTurnWithFallback(params: {
|
|||||||
const { text, skip } = normalizeStreamingText(payload);
|
const { text, skip } = normalizeStreamingText(payload);
|
||||||
const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0;
|
const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0;
|
||||||
if (skip && !hasPayloadMedia) return;
|
if (skip && !hasPayloadMedia) return;
|
||||||
|
const currentMessageId =
|
||||||
|
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid;
|
||||||
const taggedPayload = applyReplyTagsToPayload(
|
const taggedPayload = applyReplyTagsToPayload(
|
||||||
{
|
{
|
||||||
text,
|
text,
|
||||||
@@ -308,12 +310,12 @@ export async function runAgentTurnWithFallback(params: {
|
|||||||
replyToTag: payload.replyToTag,
|
replyToTag: payload.replyToTag,
|
||||||
replyToCurrent: payload.replyToCurrent,
|
replyToCurrent: payload.replyToCurrent,
|
||||||
},
|
},
|
||||||
params.sessionCtx.MessageSid,
|
currentMessageId,
|
||||||
);
|
);
|
||||||
// Let through payloads with audioAsVoice flag even if empty (need to track it)
|
// Let through payloads with audioAsVoice flag even if empty (need to track it)
|
||||||
if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return;
|
if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) return;
|
||||||
const parsed = parseReplyDirectives(taggedPayload.text ?? "", {
|
const parsed = parseReplyDirectives(taggedPayload.text ?? "", {
|
||||||
currentMessageId: params.sessionCtx.MessageSid,
|
currentMessageId,
|
||||||
silentToken: SILENT_REPLY_TOKEN,
|
silentToken: SILENT_REPLY_TOKEN,
|
||||||
});
|
});
|
||||||
const cleaned = parsed.text || undefined;
|
const cleaned = parsed.text || undefined;
|
||||||
|
|||||||
@@ -377,7 +377,7 @@ export async function runReplyAgent(params: {
|
|||||||
directlySentBlockKeys,
|
directlySentBlockKeys,
|
||||||
replyToMode,
|
replyToMode,
|
||||||
replyToChannel,
|
replyToChannel,
|
||||||
currentMessageId: sessionCtx.MessageSid,
|
currentMessageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid,
|
||||||
messageProvider: followupRun.run.messageProvider,
|
messageProvider: followupRun.run.messageProvider,
|
||||||
messagingToolSentTexts: runResult.messagingToolSentTexts,
|
messagingToolSentTexts: runResult.messagingToolSentTexts,
|
||||||
messagingToolSentTargets: runResult.messagingToolSentTargets,
|
messagingToolSentTargets: runResult.messagingToolSentTargets,
|
||||||
|
|||||||
@@ -350,7 +350,7 @@ export async function runPreparedReply(
|
|||||||
const authProfileIdSource = sessionEntry?.authProfileOverrideSource;
|
const authProfileIdSource = sessionEntry?.authProfileOverrideSource;
|
||||||
const followupRun = {
|
const followupRun = {
|
||||||
prompt: queuedBody,
|
prompt: queuedBody,
|
||||||
messageId: sessionCtx.MessageSid,
|
messageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid,
|
||||||
summaryLine: baseBodyTrimmedRaw,
|
summaryLine: baseBodyTrimmedRaw,
|
||||||
enqueuedAt: Date.now(),
|
enqueuedAt: Date.now(),
|
||||||
// Originating channel for reply routing.
|
// Originating channel for reply routing.
|
||||||
|
|||||||
@@ -38,10 +38,14 @@ export type MsgContext = {
|
|||||||
AccountId?: string;
|
AccountId?: string;
|
||||||
ParentSessionKey?: string;
|
ParentSessionKey?: string;
|
||||||
MessageSid?: string;
|
MessageSid?: string;
|
||||||
|
/** Provider-specific full message id when MessageSid is a shortened alias. */
|
||||||
|
MessageSidFull?: string;
|
||||||
MessageSids?: string[];
|
MessageSids?: string[];
|
||||||
MessageSidFirst?: string;
|
MessageSidFirst?: string;
|
||||||
MessageSidLast?: string;
|
MessageSidLast?: string;
|
||||||
ReplyToId?: string;
|
ReplyToId?: string;
|
||||||
|
/** Provider-specific full reply-to id when ReplyToId is a shortened alias. */
|
||||||
|
ReplyToIdFull?: string;
|
||||||
ReplyToBody?: string;
|
ReplyToBody?: string;
|
||||||
ReplyToSender?: string;
|
ReplyToSender?: string;
|
||||||
ForwardedFrom?: string;
|
ForwardedFrom?: string;
|
||||||
|
|||||||
@@ -212,6 +212,7 @@ export type ChannelThreadingContext = {
|
|||||||
Channel?: string;
|
Channel?: string;
|
||||||
To?: string;
|
To?: string;
|
||||||
ReplyToId?: string;
|
ReplyToId?: string;
|
||||||
|
ReplyToIdFull?: string;
|
||||||
ThreadLabel?: string;
|
ThreadLabel?: string;
|
||||||
MessageThreadId?: string | number;
|
MessageThreadId?: string | number;
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user