commit 27bd379159efba37eff7035138fd12d1c1a945eb Author: Mario Zechner Date: Sat Apr 4 03:42:02 2026 +0200 feat: add pi telegram extension package diff --git a/README.md b/README.md new file mode 100644 index 0000000..ffa403b --- /dev/null +++ b/README.md @@ -0,0 +1,135 @@ +# pi-telegram + +Telegram DM bridge for pi. + +## Install + +From git: + +```bash +pi install git:github.com/badlogic/pi-telegram +``` + +Or for a single run: + +```bash +pi -e git:github.com/badlogic/pi-telegram +``` + +## Configure + +### Telegram + +1. Open [@BotFather](https://t.me/BotFather) +2. Run `/newbot` +3. Pick a name and username +4. Copy the bot token + +### pi + +Start pi, then run: + +```bash +/telegram-setup +``` + +Paste the bot token when prompted. + +The extension stores config in: + +```text +~/.pi/agent/telegram.json +``` + +## Connect a pi session + +The Telegram bridge is session-local. Connect it only in the pi session that should own the bot: + +```bash +/telegram-connect +``` + +To stop polling in the current session: + +```bash +/telegram-disconnect +``` + +Check status: + +```bash +/telegram-status +``` + +## Pair your Telegram account + +After token setup and `/telegram-connect`: + +1. Open the DM with your bot in Telegram +2. Send `/start` + +The first DM user becomes the allowed Telegram user for the bridge. The extension only accepts messages from that user. + +## Usage + +Chat with your bot in Telegram DMs. + +### Send text + +Send any message in the bot DM. It is forwarded into pi with a `[telegram]` prefix. + +### Send images and files + +Send images, albums, or files in the DM. + +The extension: +- downloads them to `~/.pi/agent/tmp/telegram` +- includes local file paths in the prompt +- forwards inbound images as image inputs to pi + +### Ask for files back + +If you ask pi for a file or generated artifact, pi should call the `telegram_attach` tool. The extension then sends those files with the next Telegram reply. + +Examples: +- `summarize this image` +- `read this README and summarize it` +- `write me a markdown file with the plan and send it back` +- `generate a shell script and attach it` + +### Stop a run + +In Telegram, send: + +```text +stop +``` + +or: + +```text +/stop +``` + +That aborts the active pi turn. + +### Queue follow-ups + +If you send more Telegram messages while pi is busy, they are queued and processed in order. + +## Streaming + +The extension streams assistant text previews back to Telegram while pi is generating. + +It tries Telegram draft streaming first with `sendMessageDraft`. If that is not supported for your bot, it falls back to `sendMessage` plus `editMessageText`. + +## Notes + +- Only one pi session should be connected to the bot at a time +- Replies are sent as normal Telegram messages, not quote-replies +- Long replies are split below Telegram's 4096 character limit +- Outbound files are sent via `telegram_attach` + +## License + +MIT diff --git a/index.ts b/index.ts new file mode 100644 index 0000000..ec66535 --- /dev/null +++ b/index.ts @@ -0,0 +1,1062 @@ +import { mkdir, readFile, stat, writeFile } from "node:fs/promises"; +import { basename, extname, join } from "node:path"; +import { homedir } from "node:os"; + +import type { ImageContent, TextContent } from "@mariozechner/pi-ai"; +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import type { ExtensionAPI, ExtensionContext } from "@mariozechner/pi-coding-agent"; +import { Type } from "@sinclair/typebox"; + +interface TelegramConfig { + botToken?: string; + botUsername?: string; + botId?: number; + allowedUserId?: number; + lastUpdateId?: number; +} + +interface TelegramApiResponse { + ok: boolean; + result?: T; + description?: string; + error_code?: number; +} + +interface TelegramUser { + id: number; + is_bot: boolean; + first_name: string; + username?: string; +} + +interface TelegramChat { + id: number; + type: string; +} + +interface TelegramPhotoSize { + file_id: string; + file_size?: number; +} + +interface TelegramDocument { + file_id: string; + file_name?: string; + mime_type?: string; + file_size?: number; +} + +interface TelegramVideo { + file_id: string; + file_name?: string; + mime_type?: string; + file_size?: number; +} + +interface TelegramAudio { + file_id: string; + file_name?: string; + mime_type?: string; + file_size?: number; +} + +interface TelegramVoice { + file_id: string; + mime_type?: string; + file_size?: number; +} + +interface TelegramAnimation { + file_id: string; + file_name?: string; + mime_type?: string; + file_size?: number; +} + +interface TelegramSticker { + file_id: string; + emoji?: string; +} + +interface TelegramFileInfo { + file_id: string; + fileName: string; + mimeType?: string; + isImage: boolean; +} + +interface TelegramMessage { + message_id: number; + chat: TelegramChat; + from?: TelegramUser; + text?: string; + caption?: string; + media_group_id?: string; + photo?: TelegramPhotoSize[]; + document?: TelegramDocument; + video?: TelegramVideo; + audio?: TelegramAudio; + voice?: TelegramVoice; + animation?: TelegramAnimation; + sticker?: TelegramSticker; +} + +interface TelegramUpdate { + update_id: number; + message?: TelegramMessage; + edited_message?: TelegramMessage; +} + +interface TelegramGetFileResult { + file_path: string; +} + +interface TelegramSentMessage { + message_id: number; +} + +interface DownloadedTelegramFile { + path: string; + fileName: string; + isImage: boolean; + mimeType?: string; +} + +interface PendingTelegramTurn { + chatId: number; + replyToMessageId: number; + queuedAttachments: QueuedAttachment[]; + content: Array; + historyText: string; +} + +type ActiveTelegramTurn = PendingTelegramTurn; + +interface QueuedAttachment { + path: string; + fileName: string; +} + +interface TelegramPreviewState { + mode: "draft" | "message"; + draftId?: number; + messageId?: number; + pendingText: string; + lastSentText: string; + flushTimer?: ReturnType; +} + +interface TelegramMediaGroupState { + messages: TelegramMessage[]; + flushTimer?: ReturnType; +} + +const CONFIG_PATH = join(homedir(), ".pi", "agent", "telegram.json"); +const TEMP_DIR = join(homedir(), ".pi", "agent", "tmp", "telegram"); +const TELEGRAM_PREFIX = "[telegram]"; +const MAX_MESSAGE_LENGTH = 4096; +const MAX_ATTACHMENTS_PER_TURN = 10; +const PREVIEW_THROTTLE_MS = 750; +const TELEGRAM_DRAFT_ID_MAX = 2_147_483_647; +const TELEGRAM_MEDIA_GROUP_DEBOUNCE_MS = 1200; + +const SYSTEM_PROMPT_SUFFIX = ` + +Telegram bridge extension is active. +- Messages forwarded from Telegram are prefixed with "[telegram]". +- [telegram] messages may include local temp file paths for Telegram attachments. Read those files as needed. +- If a [telegram] user asked for a file or generated artifact, use the telegram_attach tool with the local file path so the extension can send it with your next final reply. +- Do not assume mentioning a local file path in plain text will send it to Telegram. Use telegram_attach.`; + +function isTelegramPrompt(prompt: string): boolean { + return prompt.trimStart().startsWith(TELEGRAM_PREFIX); +} + +function sanitizeFileName(name: string): string { + return name.replace(/[^a-zA-Z0-9._-]+/g, "_"); +} + +function guessExtensionFromMime(mimeType: string | undefined, fallback: string): string { + if (!mimeType) return fallback; + const normalized = mimeType.toLowerCase(); + if (normalized === "image/jpeg") return ".jpg"; + if (normalized === "image/png") return ".png"; + if (normalized === "image/webp") return ".webp"; + if (normalized === "image/gif") return ".gif"; + if (normalized === "audio/ogg") return ".ogg"; + if (normalized === "audio/mpeg") return ".mp3"; + if (normalized === "audio/wav") return ".wav"; + if (normalized === "video/mp4") return ".mp4"; + if (normalized === "application/pdf") return ".pdf"; + return fallback; +} + +function guessMediaType(path: string): string | undefined { + const ext = extname(path).toLowerCase(); + if (ext === ".jpg" || ext === ".jpeg") return "image/jpeg"; + if (ext === ".png") return "image/png"; + if (ext === ".webp") return "image/webp"; + if (ext === ".gif") return "image/gif"; + return undefined; +} + +function isImageMimeType(mimeType: string | undefined): boolean { + return mimeType?.toLowerCase().startsWith("image/") ?? false; +} + +function chunkParagraphs(text: string): string[] { + if (text.length <= MAX_MESSAGE_LENGTH) return [text]; + + const normalized = text.replace(/\r\n/g, "\n"); + const paragraphs = normalized.split(/\n\n+/); + const chunks: string[] = []; + let current = ""; + + const flushCurrent = (): void => { + if (current.trim().length > 0) chunks.push(current); + current = ""; + }; + + const splitLongBlock = (block: string): string[] => { + if (block.length <= MAX_MESSAGE_LENGTH) return [block]; + const lines = block.split("\n"); + const lineChunks: string[] = []; + let lineCurrent = ""; + for (const line of lines) { + const candidate = lineCurrent.length === 0 ? line : `${lineCurrent}\n${line}`; + if (candidate.length <= MAX_MESSAGE_LENGTH) { + lineCurrent = candidate; + continue; + } + if (lineCurrent.length > 0) { + lineChunks.push(lineCurrent); + lineCurrent = ""; + } + if (line.length <= MAX_MESSAGE_LENGTH) { + lineCurrent = line; + continue; + } + for (let i = 0; i < line.length; i += MAX_MESSAGE_LENGTH) { + lineChunks.push(line.slice(i, i + MAX_MESSAGE_LENGTH)); + } + } + if (lineCurrent.length > 0) lineChunks.push(lineCurrent); + return lineChunks; + }; + + for (const paragraph of paragraphs) { + if (paragraph.length === 0) continue; + const parts = splitLongBlock(paragraph); + for (const part of parts) { + const candidate = current.length === 0 ? part : `${current}\n\n${part}`; + if (candidate.length <= MAX_MESSAGE_LENGTH) { + current = candidate; + } else { + flushCurrent(); + current = part; + } + } + } + flushCurrent(); + return chunks; +} + +async function readConfig(): Promise { + try { + const content = await readFile(CONFIG_PATH, "utf8"); + const parsed = JSON.parse(content) as TelegramConfig; + return parsed; + } catch { + return {}; + } +} + +async function writeConfig(config: TelegramConfig): Promise { + await mkdir(join(homedir(), ".pi", "agent"), { recursive: true }); + await writeFile(CONFIG_PATH, JSON.stringify(config, null, "\t") + "\n", "utf8"); +} + +export default function (pi: ExtensionAPI) { + let config: TelegramConfig = {}; + let pollingController: AbortController | undefined; + let pollingPromise: Promise | undefined; + let queuedTelegramTurns: PendingTelegramTurn[] = []; + let activeTelegramTurn: ActiveTelegramTurn | undefined; + let typingInterval: ReturnType | undefined; + let currentAbort: (() => void) | undefined; + let preserveQueuedTurnsAsHistory = false; + let setupInProgress = false; + let previewState: TelegramPreviewState | undefined; + let draftSupport: "unknown" | "supported" | "unsupported" = "unknown"; + let nextDraftId = 0; + const mediaGroups = new Map(); + + function allocateDraftId(): number { + nextDraftId = nextDraftId >= TELEGRAM_DRAFT_ID_MAX ? 1 : nextDraftId + 1; + return nextDraftId; + } + + function updateStatus(ctx: ExtensionContext, error?: string): void { + const theme = ctx.ui.theme; + const label = theme.fg("accent", "telegram"); + if (error) { + ctx.ui.setStatus("telegram", `${label} ${theme.fg("error", "error")} ${theme.fg("muted", error)}`); + return; + } + if (!config.botToken) { + ctx.ui.setStatus("telegram", `${label} ${theme.fg("muted", "not configured")}`); + return; + } + if (!pollingPromise) { + ctx.ui.setStatus("telegram", `${label} ${theme.fg("muted", "disconnected")}`); + return; + } + if (!config.allowedUserId) { + ctx.ui.setStatus("telegram", `${label} ${theme.fg("warning", "awaiting pairing")}`); + return; + } + if (activeTelegramTurn || queuedTelegramTurns.length > 0) { + const queued = queuedTelegramTurns.length > 0 ? theme.fg("muted", ` +${queuedTelegramTurns.length} queued`) : ""; + ctx.ui.setStatus("telegram", `${label} ${theme.fg("accent", "processing")}${queued}`); + return; + } + ctx.ui.setStatus("telegram", `${label} ${theme.fg("success", "connected")}`); + } + + async function callTelegram( + method: string, + body: Record, + options?: { signal?: AbortSignal }, + ): Promise { + if (!config.botToken) throw new Error("Telegram bot token is not configured"); + const response = await fetch(`https://api.telegram.org/bot${config.botToken}/${method}`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(body), + signal: options?.signal, + }); + const data = (await response.json()) as TelegramApiResponse; + if (!data.ok || data.result === undefined) { + throw new Error(data.description || `Telegram API ${method} failed`); + } + return data.result; + } + + async function callTelegramMultipart( + method: string, + fields: Record, + fileField: string, + filePath: string, + fileName: string, + options?: { signal?: AbortSignal }, + ): Promise { + if (!config.botToken) throw new Error("Telegram bot token is not configured"); + const form = new FormData(); + for (const [key, value] of Object.entries(fields)) { + form.set(key, value); + } + const buffer = await readFile(filePath); + form.set(fileField, new Blob([buffer]), fileName); + const response = await fetch(`https://api.telegram.org/bot${config.botToken}/${method}`, { + method: "POST", + body: form, + signal: options?.signal, + }); + const data = (await response.json()) as TelegramApiResponse; + if (!data.ok || data.result === undefined) { + throw new Error(data.description || `Telegram API ${method} failed`); + } + return data.result; + } + + async function downloadTelegramFile(fileId: string, suggestedName: string): Promise { + if (!config.botToken) throw new Error("Telegram bot token is not configured"); + const file = await callTelegram("getFile", { file_id: fileId }); + await mkdir(TEMP_DIR, { recursive: true }); + const targetPath = join(TEMP_DIR, `${Date.now()}-${sanitizeFileName(suggestedName)}`); + const response = await fetch(`https://api.telegram.org/file/bot${config.botToken}/${file.file_path}`); + if (!response.ok) throw new Error(`Failed to download Telegram file: ${response.status}`); + const arrayBuffer = await response.arrayBuffer(); + await writeFile(targetPath, Buffer.from(arrayBuffer)); + return targetPath; + } + + function startTypingLoop(ctx: ExtensionContext, chatId?: number): void { + const targetChatId = chatId ?? activeTelegramTurn?.chatId; + if (typingInterval || targetChatId === undefined) return; + + const sendTyping = async (): Promise => { + try { + await callTelegram("sendChatAction", { chat_id: targetChatId, action: "typing" }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + updateStatus(ctx, `typing failed: ${message}`); + } + }; + + void sendTyping(); + typingInterval = setInterval(() => { + void sendTyping(); + }, 4000); + } + + function stopTypingLoop(): void { + if (!typingInterval) return; + clearInterval(typingInterval); + typingInterval = undefined; + } + + function isAssistantMessage(message: AgentMessage): boolean { + return (message as unknown as { role?: string }).role === "assistant"; + } + + function getMessageText(message: AgentMessage): string { + const value = message as unknown as Record; + const content = Array.isArray(value.content) ? value.content : []; + return content + .filter((block): block is { type: string; text?: string } => typeof block === "object" && block !== null && "type" in block) + .filter((block) => block.type === "text" && typeof block.text === "string") + .map((block) => block.text as string) + .join("") + .trim(); + } + + async function clearPreview(chatId: number): Promise { + const state = previewState; + if (!state) return; + if (state.flushTimer) { + clearTimeout(state.flushTimer); + state.flushTimer = undefined; + } + previewState = undefined; + if (state.mode === "draft" && state.draftId !== undefined) { + try { + await callTelegram("sendMessageDraft", { chat_id: chatId, draft_id: state.draftId, text: "" }); + } catch { + // ignore + } + } + } + + async function flushPreview(chatId: number): Promise { + const state = previewState; + if (!state) return; + state.flushTimer = undefined; + const text = state.pendingText.trim(); + if (!text || text === state.lastSentText) return; + const truncated = text.length > MAX_MESSAGE_LENGTH ? text.slice(0, MAX_MESSAGE_LENGTH) : text; + + if (draftSupport !== "unsupported") { + const draftId = state.draftId ?? allocateDraftId(); + state.draftId = draftId; + try { + await callTelegram("sendMessageDraft", { chat_id: chatId, draft_id: draftId, text: truncated }); + draftSupport = "supported"; + state.mode = "draft"; + state.lastSentText = truncated; + return; + } catch { + draftSupport = "unsupported"; + } + } + + if (state.messageId === undefined) { + const sent = await callTelegram("sendMessage", { chat_id: chatId, text: truncated }); + state.messageId = sent.message_id; + state.mode = "message"; + state.lastSentText = truncated; + return; + } + await callTelegram("editMessageText", { chat_id: chatId, message_id: state.messageId, text: truncated }); + state.mode = "message"; + state.lastSentText = truncated; + } + + function schedulePreviewFlush(chatId: number): void { + if (!previewState || previewState.flushTimer) return; + previewState.flushTimer = setTimeout(() => { + void flushPreview(chatId); + }, PREVIEW_THROTTLE_MS); + } + + async function finalizePreview(chatId: number): Promise { + const state = previewState; + if (!state) return false; + await flushPreview(chatId); + const finalText = (state.pendingText.trim() || state.lastSentText).trim(); + if (!finalText) { + await clearPreview(chatId); + return false; + } + if (state.mode === "draft") { + await callTelegram("sendMessage", { chat_id: chatId, text: finalText }); + await clearPreview(chatId); + return true; + } + previewState = undefined; + return state.messageId !== undefined; + } + + async function sendTextReply(chatId: number, _replyToMessageId: number, text: string): Promise { + const chunks = chunkParagraphs(text); + let lastMessageId: number | undefined; + for (const chunk of chunks) { + const sent = await callTelegram("sendMessage", { + chat_id: chatId, + text: chunk, + }); + lastMessageId = sent.message_id; + } + return lastMessageId; + } + + async function sendQueuedAttachments(turn: ActiveTelegramTurn): Promise { + for (const attachment of turn.queuedAttachments) { + try { + const mediaType = guessMediaType(attachment.path); + const method = mediaType ? "sendPhoto" : "sendDocument"; + const fieldName = mediaType ? "photo" : "document"; + await callTelegramMultipart( + method, + { + chat_id: String(turn.chatId), + }, + fieldName, + attachment.path, + attachment.fileName, + ); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + await sendTextReply(turn.chatId, turn.replyToMessageId, `Failed to send attachment ${attachment.fileName}: ${message}`); + } + } + } + + function extractAssistantText(messages: AgentMessage[]): { text?: string; stopReason?: string; errorMessage?: string } { + for (let i = messages.length - 1; i >= 0; i--) { + const message = messages[i] as unknown as Record; + if (message.role !== "assistant") continue; + const stopReason = typeof message.stopReason === "string" ? message.stopReason : undefined; + const errorMessage = typeof message.errorMessage === "string" ? message.errorMessage : undefined; + const content = Array.isArray(message.content) ? message.content : []; + const text = content + .filter((block): block is { type: string; text?: string } => typeof block === "object" && block !== null && "type" in block) + .filter((block) => block.type === "text" && typeof block.text === "string") + .map((block) => block.text as string) + .join("") + .trim(); + return { text: text || undefined, stopReason, errorMessage }; + } + return {}; + } + + function collectTelegramFileInfos(messages: TelegramMessage[]): TelegramFileInfo[] { + const files: TelegramFileInfo[] = []; + for (const message of messages) { + if (Array.isArray(message.photo) && message.photo.length > 0) { + const photo = [...message.photo].sort((a, b) => (a.file_size ?? 0) - (b.file_size ?? 0)).pop(); + if (photo) { + files.push({ + file_id: photo.file_id, + fileName: `photo-${message.message_id}.jpg`, + mimeType: "image/jpeg", + isImage: true, + }); + } + } + if (message.document) { + const fileName = message.document.file_name || `document-${message.message_id}${guessExtensionFromMime(message.document.mime_type, "")}`; + files.push({ + file_id: message.document.file_id, + fileName, + mimeType: message.document.mime_type, + isImage: isImageMimeType(message.document.mime_type), + }); + } + if (message.video) { + const fileName = message.video.file_name || `video-${message.message_id}${guessExtensionFromMime(message.video.mime_type, ".mp4")}`; + files.push({ + file_id: message.video.file_id, + fileName, + mimeType: message.video.mime_type, + isImage: false, + }); + } + if (message.audio) { + const fileName = message.audio.file_name || `audio-${message.message_id}${guessExtensionFromMime(message.audio.mime_type, ".mp3")}`; + files.push({ + file_id: message.audio.file_id, + fileName, + mimeType: message.audio.mime_type, + isImage: false, + }); + } + if (message.voice) { + files.push({ + file_id: message.voice.file_id, + fileName: `voice-${message.message_id}${guessExtensionFromMime(message.voice.mime_type, ".ogg")}`, + mimeType: message.voice.mime_type, + isImage: false, + }); + } + if (message.animation) { + const fileName = message.animation.file_name || `animation-${message.message_id}${guessExtensionFromMime(message.animation.mime_type, ".mp4")}`; + files.push({ + file_id: message.animation.file_id, + fileName, + mimeType: message.animation.mime_type, + isImage: false, + }); + } + if (message.sticker) { + files.push({ + file_id: message.sticker.file_id, + fileName: `sticker-${message.message_id}.webp`, + mimeType: "image/webp", + isImage: true, + }); + } + } + return files; + } + + async function buildTelegramFiles(messages: TelegramMessage[]): Promise { + const downloaded: DownloadedTelegramFile[] = []; + for (const file of collectTelegramFileInfos(messages)) { + const path = await downloadTelegramFile(file.file_id, file.fileName); + downloaded.push({ path, fileName: file.fileName, isImage: file.isImage, mimeType: file.mimeType }); + } + return downloaded; + } + + async function promptForConfig(ctx: ExtensionContext): Promise { + if (!ctx.hasUI || setupInProgress) return; + setupInProgress = true; + try { + const token = await ctx.ui.input("Telegram bot token", "123456:ABCDEF..."); + if (!token) return; + + const nextConfig: TelegramConfig = { ...config, botToken: token.trim() }; + const response = await fetch(`https://api.telegram.org/bot${nextConfig.botToken}/getMe`); + const data = (await response.json()) as TelegramApiResponse; + if (!data.ok || !data.result) { + ctx.ui.notify(data.description || "Invalid Telegram bot token", "error"); + return; + } + + nextConfig.botId = data.result.id; + nextConfig.botUsername = data.result.username; + config = nextConfig; + await writeConfig(config); + ctx.ui.notify(`Telegram bot connected: @${config.botUsername ?? "unknown"}`, "info"); + ctx.ui.notify("Send /start to your bot in Telegram to pair this extension with your account.", "info"); + await startPolling(ctx); + updateStatus(ctx); + } finally { + setupInProgress = false; + } + } + + async function stopPolling(): Promise { + stopTypingLoop(); + pollingController?.abort(); + pollingController = undefined; + await pollingPromise?.catch(() => undefined); + pollingPromise = undefined; + } + + function formatTelegramHistoryText(rawText: string, files: DownloadedTelegramFile[]): string { + let summary = rawText.length > 0 ? rawText : "(no text)"; + if (files.length > 0) { + summary += `\nAttachments:`; + for (const file of files) { + summary += `\n- ${file.path}`; + } + } + return summary; + } + + async function createTelegramTurn( + messages: TelegramMessage[], + historyTurns: PendingTelegramTurn[] = [], + ): Promise { + const firstMessage = messages[0]; + if (!firstMessage) throw new Error("Missing Telegram message for turn creation"); + const rawText = messages.map((message) => (message.text || message.caption || "").trim()).filter(Boolean).join("\n\n"); + const files = await buildTelegramFiles(messages); + const content: Array = []; + let prompt = `${TELEGRAM_PREFIX}`; + + if (historyTurns.length > 0) { + prompt += `\n\nEarlier Telegram messages arrived after an aborted turn. Treat them as prior user messages, in order:`; + for (const [index, turn] of historyTurns.entries()) { + prompt += `\n\n${index + 1}. ${turn.historyText}`; + } + prompt += `\n\nCurrent Telegram message:`; + } + + if (rawText.length > 0) { + prompt += historyTurns.length > 0 ? `\n${rawText}` : ` ${rawText}`; + } + if (files.length > 0) { + prompt += `\n\nTelegram attachments were saved locally:`; + for (const file of files) { + prompt += `\n- ${file.path}`; + } + } + content.push({ type: "text", text: prompt }); + + for (const file of files) { + if (!file.isImage) continue; + const mediaType = file.mimeType || guessMediaType(file.path); + if (!mediaType) continue; + const buffer = await readFile(file.path); + content.push({ + type: "image", + data: buffer.toString("base64"), + mimeType: mediaType, + }); + } + + return { + chatId: firstMessage.chat.id, + replyToMessageId: firstMessage.message_id, + queuedAttachments: [], + content, + historyText: formatTelegramHistoryText(rawText, files), + }; + } + + async function dispatchAuthorizedTelegramMessages(messages: TelegramMessage[], ctx: ExtensionContext): Promise { + const firstMessage = messages[0]; + if (!firstMessage) return; + const rawText = messages.map((message) => (message.text || message.caption || "").trim()).find((text) => text.length > 0) || ""; + const lower = rawText.toLowerCase(); + + if (lower === "stop" || lower === "/stop") { + if (currentAbort) { + if (queuedTelegramTurns.length > 0) { + preserveQueuedTurnsAsHistory = true; + } + currentAbort(); + updateStatus(ctx); + await sendTextReply(firstMessage.chat.id, firstMessage.message_id, "Aborted current turn."); + } else { + await sendTextReply(firstMessage.chat.id, firstMessage.message_id, "No active turn."); + } + return; + } + + if (lower === "/status") { + await sendTextReply(firstMessage.chat.id, firstMessage.message_id, ctx.isIdle() ? "pi is idle." : "pi is busy."); + return; + } + + if (lower === "/help" || lower === "/start") { + await sendTextReply( + firstMessage.chat.id, + firstMessage.message_id, + `Send me a message and I will forward it to pi. Send \"stop\" to abort the current turn.`, + ); + if (config.allowedUserId === undefined && firstMessage.from) { + config.allowedUserId = firstMessage.from.id; + await writeConfig(config); + updateStatus(ctx); + } + return; + } + + const historyTurns = preserveQueuedTurnsAsHistory ? queuedTelegramTurns.splice(0) : []; + preserveQueuedTurnsAsHistory = false; + const turn = await createTelegramTurn(messages, historyTurns); + queuedTelegramTurns.push(turn); + if (ctx.isIdle()) { + startTypingLoop(ctx, turn.chatId); + updateStatus(ctx); + pi.sendUserMessage(turn.content); + } + } + + async function handleAuthorizedTelegramMessage(message: TelegramMessage, ctx: ExtensionContext): Promise { + if (message.media_group_id) { + const key = `${message.chat.id}:${message.media_group_id}`; + const existing = mediaGroups.get(key) ?? { messages: [] }; + existing.messages.push(message); + if (existing.flushTimer) clearTimeout(existing.flushTimer); + existing.flushTimer = setTimeout(() => { + const state = mediaGroups.get(key); + mediaGroups.delete(key); + if (!state) return; + void dispatchAuthorizedTelegramMessages(state.messages, ctx); + }, TELEGRAM_MEDIA_GROUP_DEBOUNCE_MS); + mediaGroups.set(key, existing); + return; + } + + await dispatchAuthorizedTelegramMessages([message], ctx); + } + + async function handleUpdate(update: TelegramUpdate, ctx: ExtensionContext): Promise { + const message = update.message || update.edited_message; + if (!message || message.chat.type !== "private" || !message.from || message.from.is_bot) return; + + if (config.allowedUserId === undefined) { + config.allowedUserId = message.from.id; + await writeConfig(config); + updateStatus(ctx); + await sendTextReply(message.chat.id, message.message_id, "Telegram bridge paired with this account."); + } + + if (message.from.id !== config.allowedUserId) { + await sendTextReply(message.chat.id, message.message_id, "This bot is not authorized for your account."); + return; + } + + await handleAuthorizedTelegramMessage(message, ctx); + } + + async function pollLoop(ctx: ExtensionContext, signal: AbortSignal): Promise { + if (!config.botToken) return; + + try { + await callTelegram("deleteWebhook", { drop_pending_updates: false }, { signal }); + } catch { + // ignore + } + + if (config.lastUpdateId === undefined) { + try { + const updates = await callTelegram("getUpdates", { offset: -1, limit: 1, timeout: 0 }, { signal }); + const last = updates.at(-1); + if (last) { + config.lastUpdateId = last.update_id; + await writeConfig(config); + } + } catch { + // ignore + } + } + + while (!signal.aborted) { + try { + const updates = await callTelegram( + "getUpdates", + { + offset: config.lastUpdateId !== undefined ? config.lastUpdateId + 1 : undefined, + limit: 10, + timeout: 30, + allowed_updates: ["message", "edited_message"], + }, + { signal }, + ); + for (const update of updates) { + config.lastUpdateId = update.update_id; + await writeConfig(config); + await handleUpdate(update, ctx); + } + } catch (error) { + if (signal.aborted) return; + if (error instanceof DOMException && error.name === "AbortError") return; + const message = error instanceof Error ? error.message : String(error); + updateStatus(ctx, message); + await new Promise((resolve) => setTimeout(resolve, 3000)); + updateStatus(ctx); + } + } + } + + async function startPolling(ctx: ExtensionContext): Promise { + if (!config.botToken || pollingPromise) return; + pollingController = new AbortController(); + pollingPromise = pollLoop(ctx, pollingController.signal).finally(() => { + pollingPromise = undefined; + pollingController = undefined; + updateStatus(ctx); + }); + updateStatus(ctx); + } + + pi.registerTool({ + name: "telegram_attach", + label: "Telegram Attach", + description: "Queue one or more local files to be sent with the next Telegram reply.", + promptSnippet: "Queue local files to be sent with the next Telegram reply.", + promptGuidelines: [ + "When handling a [telegram] message and the user asked for a file or generated artifact, call telegram_attach with the local path instead of only mentioning the path in text.", + ], + parameters: Type.Object({ + paths: Type.Array(Type.String({ description: "Local file path to attach" }), { minItems: 1, maxItems: MAX_ATTACHMENTS_PER_TURN }), + }), + async execute(_toolCallId, params) { + if (!activeTelegramTurn) { + throw new Error("telegram_attach can only be used while replying to an active Telegram turn"); + } + const added: string[] = []; + for (const inputPath of params.paths) { + const stats = await stat(inputPath); + if (!stats.isFile()) { + throw new Error(`Not a file: ${inputPath}`); + } + if (activeTelegramTurn.queuedAttachments.length >= MAX_ATTACHMENTS_PER_TURN) { + throw new Error(`Attachment limit reached (${MAX_ATTACHMENTS_PER_TURN})`); + } + activeTelegramTurn.queuedAttachments.push({ path: inputPath, fileName: basename(inputPath) }); + added.push(inputPath); + } + return { + content: [{ type: "text", text: `Queued ${added.length} Telegram attachment(s).` }], + details: { paths: added }, + }; + }, + }); + + pi.registerCommand("telegram-setup", { + description: "Configure Telegram bot token", + handler: async (_args, ctx) => { + await promptForConfig(ctx); + }, + }); + + pi.registerCommand("telegram-status", { + description: "Show Telegram bridge status", + handler: async (_args, ctx) => { + const status = [ + `bot: ${config.botUsername ? `@${config.botUsername}` : "not configured"}`, + `allowed user: ${config.allowedUserId ?? "not paired"}`, + `polling: ${pollingPromise ? "running" : "stopped"}`, + `active telegram turn: ${activeTelegramTurn ? "yes" : "no"}`, + `queued telegram turns: ${queuedTelegramTurns.length}`, + ]; + ctx.ui.notify(status.join(" | "), "info"); + }, + }); + + pi.registerCommand("telegram-connect", { + description: "Start the Telegram bridge in this pi session", + handler: async (_args, ctx) => { + config = await readConfig(); + if (!config.botToken) { + await promptForConfig(ctx); + return; + } + await startPolling(ctx); + updateStatus(ctx); + }, + }); + + pi.registerCommand("telegram-disconnect", { + description: "Stop the Telegram bridge in this pi session", + handler: async (_args, ctx) => { + await stopPolling(); + updateStatus(ctx); + }, + }); + + pi.on("session_start", async (_event, ctx) => { + config = await readConfig(); + await mkdir(TEMP_DIR, { recursive: true }); + updateStatus(ctx); + }); + + pi.on("session_shutdown", async (_event, _ctx) => { + queuedTelegramTurns = []; + for (const state of mediaGroups.values()) { + if (state.flushTimer) clearTimeout(state.flushTimer); + } + mediaGroups.clear(); + if (activeTelegramTurn) { + await clearPreview(activeTelegramTurn.chatId); + } + activeTelegramTurn = undefined; + currentAbort = undefined; + preserveQueuedTurnsAsHistory = false; + await stopPolling(); + }); + + pi.on("before_agent_start", async (event) => { + const suffix = isTelegramPrompt(event.prompt) + ? `${SYSTEM_PROMPT_SUFFIX}\n- The current user message came from Telegram.` + : SYSTEM_PROMPT_SUFFIX; + return { + systemPrompt: event.systemPrompt + suffix, + }; + }); + + pi.on("agent_start", async (_event, ctx) => { + currentAbort = () => ctx.abort(); + if (!activeTelegramTurn && queuedTelegramTurns.length > 0) { + const nextTurn = queuedTelegramTurns.shift(); + if (nextTurn) { + activeTelegramTurn = { ...nextTurn }; + previewState = { mode: draftSupport === "unsupported" ? "message" : "draft", pendingText: "", lastSentText: "" }; + startTypingLoop(ctx); + } + } + updateStatus(ctx); + }); + + pi.on("message_start", async (event, _ctx) => { + if (!activeTelegramTurn || !isAssistantMessage(event.message)) return; + if (previewState && (previewState.pendingText.trim().length > 0 || previewState.lastSentText.trim().length > 0)) { + await finalizePreview(activeTelegramTurn.chatId); + } + previewState = { mode: draftSupport === "unsupported" ? "message" : "draft", pendingText: "", lastSentText: "" }; + }); + + pi.on("message_update", async (event, _ctx) => { + if (!activeTelegramTurn || !isAssistantMessage(event.message)) return; + if (!previewState) { + previewState = { mode: draftSupport === "unsupported" ? "message" : "draft", pendingText: "", lastSentText: "" }; + } + previewState.pendingText = getMessageText(event.message); + schedulePreviewFlush(activeTelegramTurn.chatId); + }); + + pi.on("agent_end", async (event, ctx) => { + const turn = activeTelegramTurn; + currentAbort = undefined; + stopTypingLoop(); + activeTelegramTurn = undefined; + updateStatus(ctx); + if (!turn) return; + + const assistant = extractAssistantText(event.messages); + if (assistant.stopReason === "aborted") { + await clearPreview(turn.chatId); + return; + } + if (assistant.stopReason === "error") { + await clearPreview(turn.chatId); + await sendTextReply(turn.chatId, turn.replyToMessageId, assistant.errorMessage || "Telegram bridge: pi failed while processing the request."); + return; + } + + const finalText = assistant.text; + if (previewState) { + previewState.pendingText = finalText ?? previewState.pendingText; + } + + if (finalText && finalText.length <= MAX_MESSAGE_LENGTH) { + const finalized = await finalizePreview(turn.chatId); + if (!finalized && turn.queuedAttachments.length > 0 && !finalText) { + await sendTextReply(turn.chatId, turn.replyToMessageId, "Attached requested file(s)."); + } + } else { + await clearPreview(turn.chatId); + if (finalText) { + await sendTextReply(turn.chatId, turn.replyToMessageId, finalText); + } else if (turn.queuedAttachments.length > 0) { + await sendTextReply(turn.chatId, turn.replyToMessageId, "Attached requested file(s)."); + } + } + + await sendQueuedAttachments(turn); + + if (queuedTelegramTurns.length > 0 && !preserveQueuedTurnsAsHistory) { + const nextTurn = queuedTelegramTurns[0]; + startTypingLoop(ctx, nextTurn.chatId); + updateStatus(ctx); + pi.sendUserMessage(nextTurn.content); + } + }); +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..f53794c --- /dev/null +++ b/package.json @@ -0,0 +1,26 @@ +{ + "name": "pi-telegram", + "version": "0.1.0", + "private": false, + "description": "Telegram DM bridge extension for pi", + "type": "module", + "keywords": ["pi-package", "pi", "telegram", "bot", "extension"], + "license": "MIT", + "repository": { + "type": "git", + "url": "https://github.com/badlogic/pi-telegram.git" + }, + "homepage": "https://github.com/badlogic/pi-telegram", + "bugs": { + "url": "https://github.com/badlogic/pi-telegram/issues" + }, + "pi": { + "extensions": ["./index.ts"] + }, + "peerDependencies": { + "@mariozechner/pi-ai": "*", + "@mariozechner/pi-agent-core": "*", + "@mariozechner/pi-coding-agent": "*", + "@sinclair/typebox": "*" + } +}