queue upgrade

This commit is contained in:
LLB
2026-04-09 12:24:48 +04:00
parent 506ef83a69
commit 105cbb49f1
2 changed files with 300 additions and 7 deletions
+27 -2
View File
@@ -11,13 +11,13 @@ Telegram DM bridge for pi.
From git:
```bash
pi install git:github.com/badlogic/pi-telegram
pi install git:github.com/llblab/pi-telegram
```
Or for a single run:
```bash
pi -e git:github.com/badlogic/pi-telegram
pi -e git:github.com/llblab/pi-telegram
```
## Configure
@@ -123,6 +123,31 @@ That aborts the active pi turn.
If you send more Telegram messages while pi is busy, they are queued and processed in order.
The pi status bar shows queued Telegram turns as compact previews, for example:
```text
+3: [summarize this image…, write a shell script…, 📎 2 attachments]
```
Each preview is limited to at most 5 words or 40 characters.
### Reprioritize or discard queued messages
While a message is still waiting in the queue:
- React with 👍 to move it into the priority block
- React with 👎 to remove it from the queue
Priority is stable:
- The first liked queued message stays ahead of later liked messages
- Removing 👍 sends the message back to its normal queue position
- Adding 👍 again gives it a fresh priority position
For media groups, a reaction on any message in the group applies to the whole queued turn.
Message reactions depend on Telegram delivering `message_reaction` updates for your bot and chat type.
## Streaming
The extension streams assistant text previews back to Telegram while pi is generating.
+273 -5
View File
@@ -114,11 +114,41 @@ interface TelegramCallbackQuery {
data?: string;
}
interface TelegramReactionTypeEmoji {
type: "emoji";
emoji: string;
}
interface TelegramReactionTypeCustomEmoji {
type: "custom_emoji";
custom_emoji_id: string;
}
interface TelegramReactionTypePaid {
type: "paid";
}
type TelegramReactionType =
| TelegramReactionTypeEmoji
| TelegramReactionTypeCustomEmoji
| TelegramReactionTypePaid;
interface TelegramMessageReactionUpdated {
chat: TelegramChat;
message_id: number;
user?: TelegramUser;
actor_chat?: TelegramChat;
old_reaction: TelegramReactionType[];
new_reaction: TelegramReactionType[];
date: number;
}
interface TelegramUpdate {
update_id: number;
message?: TelegramMessage;
edited_message?: TelegramMessage;
callback_query?: TelegramCallbackQuery;
message_reaction?: TelegramMessageReactionUpdated;
}
interface TelegramGetFileResult {
@@ -151,9 +181,13 @@ interface QueuedAttachment {
interface PendingTelegramTurn {
chatId: number;
replyToMessageId: number;
sourceMessageIds: number[];
queueOrder: number;
priorityReactionOrder?: number;
queuedAttachments: QueuedAttachment[];
content: Array<TextContent | ImageContent>;
historyText: string;
statusSummary: string;
}
type ActiveTelegramTurn = PendingTelegramTurn;
@@ -521,6 +555,52 @@ function truncateTelegramButtonLabel(label: string, maxLength = 56): string {
: `${label.slice(0, maxLength - 1)}`;
}
function truncateTelegramQueueSummary(
text: string,
maxWords = 4,
maxLength = 32,
): string {
const normalized = text.replace(/\s+/g, " ").trim();
if (!normalized) return "";
const words = normalized.split(" ");
let summary = words.slice(0, maxWords).join(" ");
if (summary.length === 0) summary = normalized;
if (summary.length > maxLength) {
summary = summary.slice(0, maxLength).trimEnd();
}
return summary.length < normalized.length || words.length > maxWords
? `${summary}`
: summary;
}
function formatTelegramTurnStatusSummary(
rawText: string,
files: DownloadedTelegramFile[],
): string {
const textSummary = truncateTelegramQueueSummary(rawText);
if (textSummary) return textSummary;
if (files.length === 1) {
const fileName = basename(
files[0]?.fileName || files[0]?.path || "attachment",
);
return `📎 ${truncateTelegramQueueSummary(fileName, 4, 32) || "attachment"}`;
}
if (files.length > 1) return `📎 ${files.length} attachments`;
return "(empty message)";
}
function formatQueuedTelegramTurnsStatus(turns: PendingTelegramTurn[]): string {
if (turns.length === 0) return "";
const previewCount = 4;
const summaries = turns
.slice(0, previewCount)
.map((turn) => turn.statusSummary)
.filter(Boolean);
if (summaries.length === 0) return ` +${turns.length}`;
const suffix = turns.length > summaries.length ? ", …" : "";
return ` +${turns.length}: [${summaries.join(", ")}${suffix}]`;
}
function formatScopedModelButtonText(
entry: ScopedTelegramModel,
currentModel: Model<any> | undefined,
@@ -1087,6 +1167,8 @@ export default function (pi: ExtensionAPI) {
let pollingController: AbortController | undefined;
let pollingPromise: Promise<void> | undefined;
let queuedTelegramTurns: PendingTelegramTurn[] = [];
let nextQueuedTelegramTurnOrder = 0;
let nextPriorityReactionOrder = 0;
let activeTelegramTurn: ActiveTelegramTurn | undefined;
let typingInterval: ReturnType<typeof setInterval> | undefined;
let currentAbort: (() => void) | undefined;
@@ -1140,10 +1222,10 @@ export default function (pi: ExtensionAPI) {
return;
}
if (activeTelegramTurn || queuedTelegramTurns.length > 0) {
const queued =
queuedTelegramTurns.length > 0
? theme.fg("muted", ` +${queuedTelegramTurns.length} queued`)
: "";
const queued = theme.fg(
"muted",
formatQueuedTelegramTurnsStatus(queuedTelegramTurns),
);
ctx.ui.setStatus(
"telegram",
`${label} ${theme.fg("accent", "processing")}${queued}`,
@@ -2162,6 +2244,10 @@ export default function (pi: ExtensionAPI) {
return messages.map(extractTelegramMessageText).find(Boolean) ?? "";
}
function collectTelegramMessageIds(messages: TelegramMessage[]): number[] {
return [...new Set(messages.map((message) => message.message_id))];
}
function formatTelegramHistoryText(
rawText: string,
files: DownloadedTelegramFile[],
@@ -2289,6 +2375,165 @@ export default function (pi: ExtensionAPI) {
return downloaded;
}
function isTelegramMessageIdList(value: unknown): value is number[] {
return (
Array.isArray(value) && value.every((item) => Number.isInteger(item))
);
}
function normalizeTelegramReactionEmoji(emoji: string): string {
return emoji.replace(/\uFE0F/g, "");
}
function collectTelegramReactionEmojis(
reactions: TelegramReactionType[],
): Set<string> {
return new Set(
reactions
.filter(
(reaction): reaction is TelegramReactionTypeEmoji =>
reaction.type === "emoji",
)
.map((reaction) => normalizeTelegramReactionEmoji(reaction.emoji)),
);
}
function compareQueuedTelegramTurns(
left: PendingTelegramTurn,
right: PendingTelegramTurn,
): number {
const leftPriority = left.priorityReactionOrder ?? Number.POSITIVE_INFINITY;
const rightPriority =
right.priorityReactionOrder ?? Number.POSITIVE_INFINITY;
if (leftPriority !== rightPriority) return leftPriority - rightPriority;
return left.queueOrder - right.queueOrder;
}
function reorderQueuedTelegramTurns(ctx: ExtensionContext): void {
queuedTelegramTurns.sort(compareQueuedTelegramTurns);
updateStatus(ctx);
}
function extractDeletedTelegramMessageIds(update: TelegramUpdate): number[] {
const deletedBusinessMessageIds = (
update as TelegramUpdate & {
deleted_business_messages?: { message_ids?: unknown };
}
).deleted_business_messages?.message_ids;
if (isTelegramMessageIdList(deletedBusinessMessageIds)) {
return deletedBusinessMessageIds;
}
const rawDeleteUpdate = update as TelegramUpdate & {
_: string;
messages?: unknown;
};
if (
rawDeleteUpdate._ === "updateDeleteMessages" &&
isTelegramMessageIdList(rawDeleteUpdate.messages)
) {
return rawDeleteUpdate.messages;
}
return [];
}
function removePendingMediaGroupMessages(messageIds: number[]): void {
if (messageIds.length === 0 || mediaGroups.size === 0) return;
const deletedMessageIds = new Set(messageIds);
for (const [key, state] of mediaGroups.entries()) {
if (
!state.messages.some((message) =>
deletedMessageIds.has(message.message_id),
)
) {
continue;
}
if (state.flushTimer) clearTimeout(state.flushTimer);
mediaGroups.delete(key);
}
}
function removeQueuedTelegramTurnsByMessageIds(
messageIds: number[],
ctx: ExtensionContext,
): number {
if (messageIds.length === 0 || queuedTelegramTurns.length === 0) return 0;
const deletedMessageIds = new Set(messageIds);
const nextQueue = queuedTelegramTurns.filter(
(turn) =>
!turn.sourceMessageIds.some((messageId) =>
deletedMessageIds.has(messageId),
),
);
const removedCount = queuedTelegramTurns.length - nextQueue.length;
if (removedCount === 0) return 0;
queuedTelegramTurns = nextQueue;
updateStatus(ctx);
return removedCount;
}
function clearQueuedTelegramTurnPriorityByMessageId(
messageId: number,
ctx: ExtensionContext,
): boolean {
const turn = queuedTelegramTurns.find((entry) =>
entry.sourceMessageIds.includes(messageId),
);
if (!turn || turn.priorityReactionOrder === undefined) return false;
turn.priorityReactionOrder = undefined;
reorderQueuedTelegramTurns(ctx);
return true;
}
function prioritizeQueuedTelegramTurnByMessageId(
messageId: number,
ctx: ExtensionContext,
): boolean {
const turn = queuedTelegramTurns.find((entry) =>
entry.sourceMessageIds.includes(messageId),
);
if (!turn) return false;
turn.priorityReactionOrder = nextPriorityReactionOrder++;
reorderQueuedTelegramTurns(ctx);
return true;
}
async function handleAuthorizedTelegramReactionUpdate(
reactionUpdate: TelegramMessageReactionUpdated,
ctx: ExtensionContext,
): Promise<void> {
const reactionUser = reactionUpdate.user;
if (
reactionUpdate.chat.type !== "private" ||
!reactionUser ||
reactionUser.is_bot ||
reactionUser.id !== config.allowedUserId
) {
return;
}
const oldEmojis = collectTelegramReactionEmojis(
reactionUpdate.old_reaction,
);
const newEmojis = collectTelegramReactionEmojis(
reactionUpdate.new_reaction,
);
const dislikeAdded = !oldEmojis.has("👎") && newEmojis.has("👎");
if (dislikeAdded) {
removePendingMediaGroupMessages([reactionUpdate.message_id]);
removeQueuedTelegramTurnsByMessageIds([reactionUpdate.message_id], ctx);
return;
}
const likeRemoved = oldEmojis.has("👍") && !newEmojis.has("👍");
if (likeRemoved) {
clearQueuedTelegramTurnPriorityByMessageId(
reactionUpdate.message_id,
ctx,
);
}
const likeAdded = !oldEmojis.has("👍") && newEmojis.has("👍");
if (!likeAdded) return;
prioritizeQueuedTelegramTurnByMessageId(reactionUpdate.message_id, ctx);
}
async function createTelegramTurn(
messages: TelegramMessage[],
historyTurns: PendingTelegramTurn[] = [],
@@ -2335,9 +2580,12 @@ export default function (pi: ExtensionAPI) {
return {
chatId: firstMessage.chat.id,
replyToMessageId: firstMessage.message_id,
sourceMessageIds: collectTelegramMessageIds(messages),
queueOrder: nextQueuedTelegramTurnOrder++,
queuedAttachments: [],
content,
historyText: formatTelegramHistoryText(rawText, files),
statusSummary: formatTelegramTurnStatusSummary(rawText, files),
};
}
@@ -2522,6 +2770,19 @@ export default function (pi: ExtensionAPI) {
update: TelegramUpdate,
ctx: ExtensionContext,
): Promise<void> {
const deletedMessageIds = extractDeletedTelegramMessageIds(update);
if (deletedMessageIds.length > 0) {
removePendingMediaGroupMessages(deletedMessageIds);
removeQueuedTelegramTurnsByMessageIds(deletedMessageIds, ctx);
return;
}
if (update.message_reaction) {
await handleAuthorizedTelegramReactionUpdate(
update.message_reaction,
ctx,
);
return;
}
if (update.callback_query) {
const query = update.callback_query;
const message = query.message;
@@ -2621,7 +2882,12 @@ export default function (pi: ExtensionAPI) {
: undefined,
limit: 10,
timeout: 30,
allowed_updates: ["message", "edited_message", "callback_query"],
allowed_updates: [
"message",
"edited_message",
"callback_query",
"message_reaction",
],
},
{ signal },
);
@@ -2763,6 +3029,8 @@ export default function (pi: ExtensionAPI) {
pi.on("session_shutdown", async (_event, _ctx) => {
queuedTelegramTurns = [];
nextQueuedTelegramTurnOrder = 0;
nextPriorityReactionOrder = 0;
currentTelegramModel = undefined;
for (const state of mediaGroups.values()) {
if (state.flushTimer) clearTimeout(state.flushTimer);