diff --git a/README.md b/README.md index 54dacb9..e3661c2 100644 --- a/README.md +++ b/README.md @@ -11,13 +11,13 @@ Telegram DM bridge for pi. From git: ```bash -pi install git:github.com/badlogic/pi-telegram +pi install git:github.com/llblab/pi-telegram ``` Or for a single run: ```bash -pi -e git:github.com/badlogic/pi-telegram +pi -e git:github.com/llblab/pi-telegram ``` ## Configure @@ -123,6 +123,31 @@ That aborts the active pi turn. If you send more Telegram messages while pi is busy, they are queued and processed in order. +The pi status bar shows queued Telegram turns as compact previews, for example: + +```text ++3: [summarize this image…, write a shell script…, πŸ“Ž 2 attachments] +``` + +Each preview is limited to at most 5 words or 40 characters. + +### Reprioritize or discard queued messages + +While a message is still waiting in the queue: + +- React with πŸ‘ to move it into the priority block +- React with πŸ‘Ž to remove it from the queue + +Priority is stable: + +- The first liked queued message stays ahead of later liked messages +- Removing πŸ‘ sends the message back to its normal queue position +- Adding πŸ‘ again gives it a fresh priority position + +For media groups, a reaction on any message in the group applies to the whole queued turn. + +Message reactions depend on Telegram delivering `message_reaction` updates for your bot and chat type. + ## Streaming The extension streams assistant text previews back to Telegram while pi is generating. diff --git a/index.ts b/index.ts index 683c0ff..ca4dd07 100644 --- a/index.ts +++ b/index.ts @@ -114,11 +114,41 @@ interface TelegramCallbackQuery { data?: string; } +interface TelegramReactionTypeEmoji { + type: "emoji"; + emoji: string; +} + +interface TelegramReactionTypeCustomEmoji { + type: "custom_emoji"; + custom_emoji_id: string; +} + +interface TelegramReactionTypePaid { + type: "paid"; +} + +type TelegramReactionType = + | TelegramReactionTypeEmoji + | TelegramReactionTypeCustomEmoji + | TelegramReactionTypePaid; + +interface TelegramMessageReactionUpdated { + chat: TelegramChat; + message_id: number; + user?: TelegramUser; + actor_chat?: TelegramChat; + old_reaction: TelegramReactionType[]; + new_reaction: TelegramReactionType[]; + date: number; +} + interface TelegramUpdate { update_id: number; message?: TelegramMessage; edited_message?: TelegramMessage; callback_query?: TelegramCallbackQuery; + message_reaction?: TelegramMessageReactionUpdated; } interface TelegramGetFileResult { @@ -151,9 +181,13 @@ interface QueuedAttachment { interface PendingTelegramTurn { chatId: number; replyToMessageId: number; + sourceMessageIds: number[]; + queueOrder: number; + priorityReactionOrder?: number; queuedAttachments: QueuedAttachment[]; content: Array; historyText: string; + statusSummary: string; } type ActiveTelegramTurn = PendingTelegramTurn; @@ -521,6 +555,52 @@ function truncateTelegramButtonLabel(label: string, maxLength = 56): string { : `${label.slice(0, maxLength - 1)}…`; } +function truncateTelegramQueueSummary( + text: string, + maxWords = 4, + maxLength = 32, +): string { + const normalized = text.replace(/\s+/g, " ").trim(); + if (!normalized) return ""; + const words = normalized.split(" "); + let summary = words.slice(0, maxWords).join(" "); + if (summary.length === 0) summary = normalized; + if (summary.length > maxLength) { + summary = summary.slice(0, maxLength).trimEnd(); + } + return summary.length < normalized.length || words.length > maxWords + ? `${summary}…` + : summary; +} + +function formatTelegramTurnStatusSummary( + rawText: string, + files: DownloadedTelegramFile[], +): string { + const textSummary = truncateTelegramQueueSummary(rawText); + if (textSummary) return textSummary; + if (files.length === 1) { + const fileName = basename( + files[0]?.fileName || files[0]?.path || "attachment", + ); + return `πŸ“Ž ${truncateTelegramQueueSummary(fileName, 4, 32) || "attachment"}`; + } + if (files.length > 1) return `πŸ“Ž ${files.length} attachments`; + return "(empty message)"; +} + +function formatQueuedTelegramTurnsStatus(turns: PendingTelegramTurn[]): string { + if (turns.length === 0) return ""; + const previewCount = 4; + const summaries = turns + .slice(0, previewCount) + .map((turn) => turn.statusSummary) + .filter(Boolean); + if (summaries.length === 0) return ` +${turns.length}`; + const suffix = turns.length > summaries.length ? ", …" : ""; + return ` +${turns.length}: [${summaries.join(", ")}${suffix}]`; +} + function formatScopedModelButtonText( entry: ScopedTelegramModel, currentModel: Model | undefined, @@ -1087,6 +1167,8 @@ export default function (pi: ExtensionAPI) { let pollingController: AbortController | undefined; let pollingPromise: Promise | undefined; let queuedTelegramTurns: PendingTelegramTurn[] = []; + let nextQueuedTelegramTurnOrder = 0; + let nextPriorityReactionOrder = 0; let activeTelegramTurn: ActiveTelegramTurn | undefined; let typingInterval: ReturnType | undefined; let currentAbort: (() => void) | undefined; @@ -1140,10 +1222,10 @@ export default function (pi: ExtensionAPI) { return; } if (activeTelegramTurn || queuedTelegramTurns.length > 0) { - const queued = - queuedTelegramTurns.length > 0 - ? theme.fg("muted", ` +${queuedTelegramTurns.length} queued`) - : ""; + const queued = theme.fg( + "muted", + formatQueuedTelegramTurnsStatus(queuedTelegramTurns), + ); ctx.ui.setStatus( "telegram", `${label} ${theme.fg("accent", "processing")}${queued}`, @@ -2162,6 +2244,10 @@ export default function (pi: ExtensionAPI) { return messages.map(extractTelegramMessageText).find(Boolean) ?? ""; } + function collectTelegramMessageIds(messages: TelegramMessage[]): number[] { + return [...new Set(messages.map((message) => message.message_id))]; + } + function formatTelegramHistoryText( rawText: string, files: DownloadedTelegramFile[], @@ -2289,6 +2375,165 @@ export default function (pi: ExtensionAPI) { return downloaded; } + function isTelegramMessageIdList(value: unknown): value is number[] { + return ( + Array.isArray(value) && value.every((item) => Number.isInteger(item)) + ); + } + + function normalizeTelegramReactionEmoji(emoji: string): string { + return emoji.replace(/\uFE0F/g, ""); + } + + function collectTelegramReactionEmojis( + reactions: TelegramReactionType[], + ): Set { + return new Set( + reactions + .filter( + (reaction): reaction is TelegramReactionTypeEmoji => + reaction.type === "emoji", + ) + .map((reaction) => normalizeTelegramReactionEmoji(reaction.emoji)), + ); + } + + function compareQueuedTelegramTurns( + left: PendingTelegramTurn, + right: PendingTelegramTurn, + ): number { + const leftPriority = left.priorityReactionOrder ?? Number.POSITIVE_INFINITY; + const rightPriority = + right.priorityReactionOrder ?? Number.POSITIVE_INFINITY; + if (leftPriority !== rightPriority) return leftPriority - rightPriority; + return left.queueOrder - right.queueOrder; + } + + function reorderQueuedTelegramTurns(ctx: ExtensionContext): void { + queuedTelegramTurns.sort(compareQueuedTelegramTurns); + updateStatus(ctx); + } + + function extractDeletedTelegramMessageIds(update: TelegramUpdate): number[] { + const deletedBusinessMessageIds = ( + update as TelegramUpdate & { + deleted_business_messages?: { message_ids?: unknown }; + } + ).deleted_business_messages?.message_ids; + if (isTelegramMessageIdList(deletedBusinessMessageIds)) { + return deletedBusinessMessageIds; + } + const rawDeleteUpdate = update as TelegramUpdate & { + _: string; + messages?: unknown; + }; + if ( + rawDeleteUpdate._ === "updateDeleteMessages" && + isTelegramMessageIdList(rawDeleteUpdate.messages) + ) { + return rawDeleteUpdate.messages; + } + return []; + } + + function removePendingMediaGroupMessages(messageIds: number[]): void { + if (messageIds.length === 0 || mediaGroups.size === 0) return; + const deletedMessageIds = new Set(messageIds); + for (const [key, state] of mediaGroups.entries()) { + if ( + !state.messages.some((message) => + deletedMessageIds.has(message.message_id), + ) + ) { + continue; + } + if (state.flushTimer) clearTimeout(state.flushTimer); + mediaGroups.delete(key); + } + } + + function removeQueuedTelegramTurnsByMessageIds( + messageIds: number[], + ctx: ExtensionContext, + ): number { + if (messageIds.length === 0 || queuedTelegramTurns.length === 0) return 0; + const deletedMessageIds = new Set(messageIds); + const nextQueue = queuedTelegramTurns.filter( + (turn) => + !turn.sourceMessageIds.some((messageId) => + deletedMessageIds.has(messageId), + ), + ); + const removedCount = queuedTelegramTurns.length - nextQueue.length; + if (removedCount === 0) return 0; + queuedTelegramTurns = nextQueue; + updateStatus(ctx); + return removedCount; + } + + function clearQueuedTelegramTurnPriorityByMessageId( + messageId: number, + ctx: ExtensionContext, + ): boolean { + const turn = queuedTelegramTurns.find((entry) => + entry.sourceMessageIds.includes(messageId), + ); + if (!turn || turn.priorityReactionOrder === undefined) return false; + turn.priorityReactionOrder = undefined; + reorderQueuedTelegramTurns(ctx); + return true; + } + + function prioritizeQueuedTelegramTurnByMessageId( + messageId: number, + ctx: ExtensionContext, + ): boolean { + const turn = queuedTelegramTurns.find((entry) => + entry.sourceMessageIds.includes(messageId), + ); + if (!turn) return false; + turn.priorityReactionOrder = nextPriorityReactionOrder++; + reorderQueuedTelegramTurns(ctx); + return true; + } + + async function handleAuthorizedTelegramReactionUpdate( + reactionUpdate: TelegramMessageReactionUpdated, + ctx: ExtensionContext, + ): Promise { + const reactionUser = reactionUpdate.user; + if ( + reactionUpdate.chat.type !== "private" || + !reactionUser || + reactionUser.is_bot || + reactionUser.id !== config.allowedUserId + ) { + return; + } + const oldEmojis = collectTelegramReactionEmojis( + reactionUpdate.old_reaction, + ); + const newEmojis = collectTelegramReactionEmojis( + reactionUpdate.new_reaction, + ); + const dislikeAdded = !oldEmojis.has("πŸ‘Ž") && newEmojis.has("πŸ‘Ž"); + if (dislikeAdded) { + removePendingMediaGroupMessages([reactionUpdate.message_id]); + removeQueuedTelegramTurnsByMessageIds([reactionUpdate.message_id], ctx); + return; + } + const likeRemoved = oldEmojis.has("πŸ‘") && !newEmojis.has("πŸ‘"); + if (likeRemoved) { + clearQueuedTelegramTurnPriorityByMessageId( + reactionUpdate.message_id, + ctx, + ); + } + const likeAdded = !oldEmojis.has("πŸ‘") && newEmojis.has("πŸ‘"); + if (!likeAdded) return; + prioritizeQueuedTelegramTurnByMessageId(reactionUpdate.message_id, ctx); + } + async function createTelegramTurn( messages: TelegramMessage[], historyTurns: PendingTelegramTurn[] = [], @@ -2335,9 +2580,12 @@ export default function (pi: ExtensionAPI) { return { chatId: firstMessage.chat.id, replyToMessageId: firstMessage.message_id, + sourceMessageIds: collectTelegramMessageIds(messages), + queueOrder: nextQueuedTelegramTurnOrder++, queuedAttachments: [], content, historyText: formatTelegramHistoryText(rawText, files), + statusSummary: formatTelegramTurnStatusSummary(rawText, files), }; } @@ -2522,6 +2770,19 @@ export default function (pi: ExtensionAPI) { update: TelegramUpdate, ctx: ExtensionContext, ): Promise { + const deletedMessageIds = extractDeletedTelegramMessageIds(update); + if (deletedMessageIds.length > 0) { + removePendingMediaGroupMessages(deletedMessageIds); + removeQueuedTelegramTurnsByMessageIds(deletedMessageIds, ctx); + return; + } + if (update.message_reaction) { + await handleAuthorizedTelegramReactionUpdate( + update.message_reaction, + ctx, + ); + return; + } if (update.callback_query) { const query = update.callback_query; const message = query.message; @@ -2621,7 +2882,12 @@ export default function (pi: ExtensionAPI) { : undefined, limit: 10, timeout: 30, - allowed_updates: ["message", "edited_message", "callback_query"], + allowed_updates: [ + "message", + "edited_message", + "callback_query", + "message_reaction", + ], }, { signal }, ); @@ -2763,6 +3029,8 @@ export default function (pi: ExtensionAPI) { pi.on("session_shutdown", async (_event, _ctx) => { queuedTelegramTurns = []; + nextQueuedTelegramTurnOrder = 0; + nextPriorityReactionOrder = 0; currentTelegramModel = undefined; for (const state of mediaGroups.values()) { if (state.flushTimer) clearTimeout(state.flushTimer);