Files
pi-telegram/index.ts
T
wassname 66898e02b4 Make /help dynamic and re-sync the bot menu
/help now re-runs setMyCommands so late-registered pi commands (skills,
prompts, extensions that registered after /start) become discoverable
without restarting the bridge, and prints the live pi.getCommands() list
so the user can see what's actually wired up right now.

Addresses the symptom where the Telegram command menu sometimes shows
only the bridge-local commands - a registration-order race against the
initial setMyCommands at /start.
2026-04-29 22:48:08 +08:00

2372 lines
72 KiB
TypeScript

/**
* Telegram bridge extension entrypoint and orchestration layer
* Keeps the runtime wiring in one place while delegating reusable domain logic to /lib modules
*/
import { mkdir, readFile, stat } from "node:fs/promises";
import { homedir } from "node:os";
import { join } from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { Model } from "@mariozechner/pi-ai";
import type {
ExtensionAPI,
ExtensionContext,
} from "@mariozechner/pi-coding-agent";
import {
createTelegramApiClient,
readTelegramConfig,
writeTelegramConfig,
type TelegramConfig,
} from "./lib/api.ts";
import { sendQueuedTelegramAttachments } from "./lib/attachments.ts";
import {
collectTelegramFileInfos,
extractFirstTelegramMessageText,
extractTelegramMessagesText,
guessMediaType,
} from "./lib/media.ts";
import {
buildTelegramModelMenuState,
getCanonicalModelId,
handleTelegramMenuCallbackEntry,
handleTelegramModelMenuCallbackAction,
handleTelegramStatusMenuCallbackAction,
handleTelegramThinkingMenuCallbackAction,
sendTelegramModelMenuMessage,
sendTelegramStatusMessage,
updateTelegramModelMenuMessage,
updateTelegramStatusMessage,
updateTelegramThinkingMenuMessage,
type ScopedTelegramModel,
type TelegramModelMenuState,
type TelegramReplyMarkup,
type ThinkingLevel,
} from "./lib/menu.ts";
import {
buildTelegramModelSwitchContinuationText,
canRestartTelegramTurnForModelSwitch,
restartTelegramModelSwitchContinuation,
shouldTriggerPendingTelegramModelSwitchAbort,
} from "./lib/model-switch.ts";
import { runTelegramPollLoop } from "./lib/polling.ts";
import {
buildTelegramAgentEndPlan,
buildTelegramAgentStartPlan,
buildTelegramSessionShutdownState,
buildTelegramSessionStartState,
canDispatchTelegramTurnState,
clearTelegramQueuePromptPriority,
compareTelegramQueueItems,
consumeDispatchedTelegramPrompt,
executeTelegramControlItemRuntime,
executeTelegramQueueDispatchPlan,
formatQueuedTelegramItemsStatus,
getNextTelegramToolExecutionCount,
partitionTelegramQueueItemsForHistory,
planNextTelegramQueueAction,
prioritizeTelegramQueuePrompt,
removeTelegramQueueItemsByMessageIds,
shouldDispatchAfterTelegramAgentEnd,
shouldStartTelegramPolling,
type PendingTelegramControlItem,
type PendingTelegramTurn,
type TelegramQueueItem,
} from "./lib/queue.ts";
import {
buildTelegramBotCommands,
registerTelegramAttachmentTool,
registerTelegramCommands,
registerTelegramLifecycleHooks,
} from "./lib/registration.ts";
import {
MAX_MESSAGE_LENGTH,
renderBlockMessage,
renderMarkdownPreviewText,
renderTelegramMessage,
type DisplayMode,
type TelegramAssistantDisplayBlock,
type TelegramRenderMode,
} from "./lib/rendering.ts";
import {
buildTelegramReplyTransport,
clearTelegramPreview,
finalizeTelegramMarkdownPreview,
finalizeTelegramPreview,
flushTelegramPreview,
sendTelegramMarkdownReply,
sendTelegramPlainReply,
} from "./lib/replies.ts";
import {
getTelegramBotTokenInputDefault,
getTelegramBotTokenPromptSpec,
readAllowedUserIdFromEnv,
} from "./lib/setup.ts";
import { buildStatusHtml, extractTurnCost, formatTurnCostLine } from "./lib/status.ts";
import {
buildTelegramPromptTurn,
truncateTelegramQueueSummary,
} from "./lib/turns.ts";
import {
collectTelegramReactionEmojis,
executeTelegramUpdate,
} from "./lib/updates.ts";
// --- Telegram API Types ---
interface TelegramApiResponse<T> {
ok: boolean;
result?: T;
description?: string;
error_code?: number;
}
interface TelegramUser {
id: number;
is_bot: boolean;
first_name: string;
username?: string;
}
interface TelegramChat {
id: number;
type: string;
}
interface TelegramPhotoSize {
file_id: string;
file_size?: number;
}
interface TelegramDocument {
file_id: string;
file_name?: string;
mime_type?: string;
file_size?: number;
}
interface TelegramVideo {
file_id: string;
file_name?: string;
mime_type?: string;
file_size?: number;
}
interface TelegramAudio {
file_id: string;
file_name?: string;
mime_type?: string;
file_size?: number;
}
interface TelegramVoice {
file_id: string;
mime_type?: string;
file_size?: number;
}
interface TelegramAnimation {
file_id: string;
file_name?: string;
mime_type?: string;
file_size?: number;
}
interface TelegramSticker {
file_id: string;
emoji?: string;
}
interface TelegramFileInfo {
file_id: string;
fileName: string;
mimeType?: string;
isImage: boolean;
}
interface TelegramMessage {
message_id: number;
chat: TelegramChat;
from?: TelegramUser;
text?: string;
caption?: string;
media_group_id?: string;
photo?: TelegramPhotoSize[];
document?: TelegramDocument;
video?: TelegramVideo;
audio?: TelegramAudio;
voice?: TelegramVoice;
animation?: TelegramAnimation;
sticker?: TelegramSticker;
}
interface TelegramCallbackQuery {
id: string;
from: TelegramUser;
message?: TelegramMessage;
data?: string;
}
interface TelegramReactionTypeEmoji {
type: "emoji";
emoji: string;
}
interface TelegramReactionTypeCustomEmoji {
type: "custom_emoji";
custom_emoji_id: string;
}
interface TelegramReactionTypePaid {
type: "paid";
}
type TelegramReactionType =
| TelegramReactionTypeEmoji
| TelegramReactionTypeCustomEmoji
| TelegramReactionTypePaid;
interface TelegramMessageReactionUpdated {
chat: TelegramChat;
message_id: number;
user?: TelegramUser;
actor_chat?: TelegramChat;
old_reaction: TelegramReactionType[];
new_reaction: TelegramReactionType[];
date: number;
}
interface TelegramUpdate {
_: string;
update_id: number;
message?: TelegramMessage;
edited_message?: TelegramMessage;
callback_query?: TelegramCallbackQuery;
message_reaction?: TelegramMessageReactionUpdated;
deleted_business_messages?: { message_ids?: unknown };
messages?: unknown;
}
interface TelegramGetFileResult {
file_path: string;
}
interface TelegramSentMessage {
message_id: number;
}
// --- Extension State Types ---
interface DownloadedTelegramFile {
path: string;
fileName: string;
isImage: boolean;
mimeType?: string;
}
type ActiveTelegramTurn = PendingTelegramTurn;
interface TelegramPreviewState {
mode: "draft" | "message";
draftId?: number;
messageId?: number;
pendingText: string;
lastSentText: string;
flushTimer?: ReturnType<typeof setTimeout>;
}
interface TelegramMediaGroupState {
messages: TelegramMessage[];
flushTimer?: ReturnType<typeof setTimeout>;
}
const AGENT_DIR = join(homedir(), ".pi", "agent");
const CONFIG_PATH = join(AGENT_DIR, "telegram.json");
const TEMP_DIR = join(AGENT_DIR, "tmp", "telegram");
const TELEGRAM_PREFIX = "[telegram]";
const MAX_ATTACHMENTS_PER_TURN = 10;
const PREVIEW_THROTTLE_MS = 750;
const TELEGRAM_DRAFT_ID_MAX = 2_147_483_647;
const TELEGRAM_MEDIA_GROUP_DEBOUNCE_MS = 1200;
const STALE_ABORT_RECOVERY_GRACE_MS = 1500;
const SYSTEM_PROMPT_SUFFIX = `
Telegram bridge extension is active.
- Messages forwarded from Telegram are prefixed with "[telegram]".
- [telegram] messages may include local temp file paths for Telegram attachments. Read those files as needed.
- If a [telegram] user asked for a file or generated artifact, use the telegram_attach tool with the local file path so the extension can send it with your next final reply.
- Do not assume mentioning a local file path in plain text will send it to Telegram. Use telegram_attach.`;
// --- Generic Utilities ---
function isTelegramPrompt(prompt: string): boolean {
return prompt.trimStart().startsWith(TELEGRAM_PREFIX);
}
function sanitizeFileName(name: string): string {
return name.replace(/[^a-zA-Z0-9._-]+/g, "_");
}
function parseTelegramCommand(
text: string,
): { name: string; args: string } | undefined {
const trimmed = text.trim();
if (!trimmed.startsWith("/")) return undefined;
const [head, ...tail] = trimmed.split(/\s+/);
const name = head.slice(1).split("@")[0]?.toLowerCase();
if (!name) return undefined;
return { name, args: tail.join(" ").trim() };
}
function getCliScopedModelPatterns(): string[] | undefined {
const args = process.argv.slice(2);
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (arg === "--models") {
const value = args[i + 1] ?? "";
const patterns = value
.split(",")
.map((pattern) => pattern.trim())
.filter(Boolean);
return patterns.length > 0 ? patterns : undefined;
}
if (arg.startsWith("--models=")) {
const patterns = arg
.slice("--models=".length)
.split(",")
.map((pattern) => pattern.trim())
.filter(Boolean);
return patterns.length > 0 ? patterns : undefined;
}
}
return undefined;
}
function truncateTelegramButtonLabel(label: string, maxLength = 56): string {
return label.length <= maxLength
? label
: `${label.slice(0, maxLength - 1)}…`;
}
function buildShellCommandReply(options: {
shellCmd: string;
stdout: string;
stderr: string;
exitCode: number;
}): string {
const sections = [
`**Shell**\n\`\`\`sh\n${options.shellCmd}\n\`\`\``,
`Exit code: \`${options.exitCode}\``,
];
const stdout = options.stdout.trimEnd();
const stderr = options.stderr.trimEnd();
if (stdout) {
sections.push(`**stdout**\n\`\`\`text\n${stdout}\n\`\`\``);
}
if (stderr) {
sections.push(`**stderr**\n\`\`\`text\n${stderr}\n\`\`\``);
}
if (!stdout && !stderr) {
sections.push("`(no output)`");
}
return sections.join("\n\n");
}
// --- Extension Runtime ---
export const __telegramTestUtils = {
MAX_MESSAGE_LENGTH,
STALE_ABORT_RECOVERY_GRACE_MS,
renderTelegramMessage,
compareTelegramQueueItems,
removeTelegramQueueItemsByMessageIds,
clearTelegramQueuePromptPriority,
prioritizeTelegramQueuePrompt,
partitionTelegramQueueItemsForHistory,
consumeDispatchedTelegramPrompt,
planNextTelegramQueueAction,
shouldDispatchAfterTelegramAgentEnd,
buildTelegramAgentEndPlan,
canDispatchTelegramTurnState,
getTelegramBotTokenInputDefault,
getTelegramBotTokenPromptSpec,
readAllowedUserIdFromEnv,
canRestartTelegramTurnForModelSwitch,
restartTelegramModelSwitchContinuation,
shouldTriggerPendingTelegramModelSwitchAbort,
buildShellCommandReply,
buildTelegramModelSwitchContinuationText: (
model: Pick<Model<any>, "provider" | "id">,
thinkingLevel?: ThinkingLevel,
) =>
buildTelegramModelSwitchContinuationText(
TELEGRAM_PREFIX,
model,
thinkingLevel,
),
};
// Install once: a stray Telegram fetch rejection that escapes through a fire-and-forget
// `void f()` would otherwise crash the host process under Node 22 (default
// unhandledRejection = throw). Dropping the message is fine; killing the session is not.
// Scoped to unhandledRejection only - we deliberately do NOT swallow uncaughtException,
// since that could mask real bugs unrelated to this bridge.
let unhandledRejectionHandlerInstalled = false;
function installUnhandledRejectionHandler(): void {
if (unhandledRejectionHandlerInstalled) return;
unhandledRejectionHandlerInstalled = true;
process.on("unhandledRejection", (reason) => {
console.error("[pi-telegram] unhandledRejection (suppressed):", reason);
});
}
export default function (pi: ExtensionAPI) {
installUnhandledRejectionHandler();
let config: TelegramConfig = {};
let pollingController: AbortController | undefined;
let pollingPromise: Promise<void> | undefined;
let queuedTelegramItems: TelegramQueueItem[] = [];
let nextQueuedTelegramItemOrder = 0;
let nextQueuedTelegramControlOrder = 0;
let nextPriorityReactionOrder = 0;
let activeTelegramTurn: ActiveTelegramTurn | undefined;
let activeTelegramToolExecutions = 0;
let pendingTelegramModelSwitch: ScopedTelegramModel | undefined;
let telegramTurnDispatchPending = false;
let typingInterval: ReturnType<typeof setInterval> | undefined;
let currentAbort: (() => void) | undefined;
let abortRequestedAt: number | undefined;
let preserveQueuedTurnsAsHistory = false;
let compactionInProgress = false;
let setupInProgress = false;
let previewState: TelegramPreviewState | undefined;
let displayMode: DisplayMode = "compact";
let pendingNonTextBlocks: TelegramAssistantDisplayBlock[] = [];
let draftSupport: "unknown" | "supported" | "unsupported" = "unknown";
let nextDraftId = 0;
let currentTelegramModel: Model<any> | undefined;
const mediaGroups = new Map<string, TelegramMediaGroupState>();
const modelMenus = new Map<number, TelegramModelMenuState>();
// --- Runtime State ---
function allocateDraftId(): number {
nextDraftId = nextDraftId >= TELEGRAM_DRAFT_ID_MAX ? 1 : nextDraftId + 1;
return nextDraftId;
}
function canDispatchQueuedTelegramTurn(ctx: ExtensionContext): boolean {
return canDispatchTelegramTurnState({
compactionInProgress,
hasActiveTelegramTurn: !!activeTelegramTurn,
hasPendingTelegramDispatch: telegramTurnDispatchPending,
isIdle: ctx.isIdle(),
hasPendingMessages: ctx.hasPendingMessages(),
});
}
function markTelegramAbortRequested(): void {
abortRequestedAt = Date.now();
}
function clearTelegramAbortRequested(): void {
abortRequestedAt = undefined;
}
function shouldRecoverStaleTelegramAbort(ctx: ExtensionContext): boolean {
if (!activeTelegramTurn || !currentAbort || abortRequestedAt === undefined) {
return false;
}
if (!ctx.isIdle() || ctx.hasPendingMessages()) {
return false;
}
return Date.now() - abortRequestedAt >= STALE_ABORT_RECOVERY_GRACE_MS;
}
async function recoverStaleTelegramAbort(ctx: ExtensionContext): Promise<void> {
const turn = activeTelegramTurn;
if (!turn) return;
stopTypingLoop();
currentAbort = undefined;
activeTelegramTurn = undefined;
activeTelegramToolExecutions = 0;
pendingTelegramModelSwitch = undefined;
telegramTurnDispatchPending = false;
pendingNonTextBlocks = [];
clearTelegramAbortRequested();
await clearPreview(turn.chatId);
updateStatus(ctx, "recovered stale aborted Telegram turn");
dispatchNextQueuedTelegramTurn(ctx);
}
function executeQueuedTelegramControlItem(
item: PendingTelegramControlItem,
ctx: ExtensionContext,
): void {
void executeTelegramControlItemRuntime(item, {
ctx,
sendTextReply,
onSettled: () => {
updateStatus(ctx);
dispatchNextQueuedTelegramTurn(ctx);
},
});
}
function dispatchNextQueuedTelegramTurn(ctx: ExtensionContext): void {
const dispatchPlan = planNextTelegramQueueAction(
queuedTelegramItems,
canDispatchQueuedTelegramTurn(ctx),
);
if (dispatchPlan.kind !== "none") {
queuedTelegramItems = dispatchPlan.remainingItems;
}
executeTelegramQueueDispatchPlan(dispatchPlan, {
executeControlItem: (item) => {
updateStatus(ctx);
executeQueuedTelegramControlItem(item, ctx);
},
onPromptDispatchStart: (chatId) => {
telegramTurnDispatchPending = true;
startTypingLoop(ctx, chatId);
updateStatus(ctx);
},
sendUserMessage: (content) => {
pi.sendUserMessage(content);
},
onPromptDispatchFailure: (message) => {
telegramTurnDispatchPending = false;
stopTypingLoop();
updateStatus(ctx, `dispatch failed: ${message}`);
},
onIdle: () => {
updateStatus(ctx);
},
});
}
// --- Status ---
function updateStatus(ctx: ExtensionContext, error?: string): void {
const theme = ctx.ui.theme;
const label = theme.fg("accent", "telegram");
if (error) {
ctx.ui.setStatus(
"telegram",
`${label} ${theme.fg("error", "error")} ${theme.fg("muted", error)}`,
);
return;
}
if (!config.botToken) {
ctx.ui.setStatus(
"telegram",
`${label} ${theme.fg("muted", "not configured")}`,
);
return;
}
if (!pollingPromise) {
ctx.ui.setStatus(
"telegram",
`${label} ${theme.fg("muted", "disconnected")}`,
);
return;
}
if (!config.allowedUserId) {
ctx.ui.setStatus(
"telegram",
`${label} ${theme.fg("warning", "awaiting config")}`,
);
return;
}
if (compactionInProgress) {
const queued = theme.fg(
"muted",
formatQueuedTelegramItemsStatus(queuedTelegramItems),
);
ctx.ui.setStatus(
"telegram",
`${label} ${theme.fg("accent", "compacting")}${queued}`,
);
return;
}
if (
activeTelegramTurn ||
telegramTurnDispatchPending ||
queuedTelegramItems.length > 0
) {
const queued = theme.fg(
"muted",
formatQueuedTelegramItemsStatus(queuedTelegramItems),
);
ctx.ui.setStatus(
"telegram",
`${label} ${theme.fg("accent", "processing")}${queued}`,
);
return;
}
ctx.ui.setStatus(
"telegram",
`${label} ${theme.fg("success", "connected")}`,
);
}
// --- Telegram API ---
const telegramApi = createTelegramApiClient(() => config.botToken);
const callTelegramApi = <TResponse>(
method: string,
body: Record<string, unknown>,
options?: { signal?: AbortSignal },
): Promise<TResponse> => {
return telegramApi.call<TResponse>(method, body, options);
};
const callTelegramMultipartApi = <TResponse>(
method: string,
fields: Record<string, string>,
fileField: string,
filePath: string,
fileName: string,
options?: { signal?: AbortSignal },
): Promise<TResponse> => {
return telegramApi.callMultipart<TResponse>(
method,
fields,
fileField,
filePath,
fileName,
options,
);
};
const downloadTelegramBridgeFile = (
fileId: string,
suggestedName: string,
): Promise<string> => {
return telegramApi.downloadFile(fileId, suggestedName, TEMP_DIR);
};
const answerCallbackQuery = (
callbackQueryId: string,
text?: string,
): Promise<void> => {
return telegramApi.answerCallbackQuery(callbackQueryId, text);
};
// --- Message Delivery & Preview ---
function startTypingLoop(ctx: ExtensionContext, chatId?: number): void {
const targetChatId = chatId ?? activeTelegramTurn?.chatId;
if (typingInterval || targetChatId === undefined) return;
const sendTyping = async (): Promise<void> => {
try {
await callTelegramApi("sendChatAction", {
chat_id: targetChatId,
action: "typing",
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
updateStatus(ctx, `typing failed: ${message}`);
}
};
void sendTyping();
typingInterval = setInterval(() => {
void sendTyping();
}, 4000);
}
function stopTypingLoop(): void {
if (!typingInterval) return;
clearInterval(typingInterval);
typingInterval = undefined;
}
function isAssistantMessage(message: AgentMessage): boolean {
return (message as unknown as { role?: string }).role === "assistant";
}
function stringifyToolArgs(args: unknown): string | undefined {
if (args === undefined) return undefined;
if (typeof args === "string") return args.trim() || undefined;
const encoded = JSON.stringify(args, null, 2);
return encoded?.trim() || undefined;
}
function stringifyTraceFallback(value: unknown): string | undefined {
if (value === undefined) return undefined;
if (typeof value === "string") return value.trim() || undefined;
try {
return JSON.stringify(value, null, 2)?.trim() || undefined;
} catch {
return String(value).trim() || undefined;
}
}
function extractTextBlocksContent(content: unknown): string {
if (typeof content === "string") return content.trim();
if (!Array.isArray(content)) return "";
return content
.map((block) => {
if (typeof block !== "object" || block === null) return "";
const candidate = block as Record<string, unknown>;
return candidate.type === "text" && typeof candidate.text === "string"
? candidate.text
: "";
})
.join("")
.trim();
}
function normalizeAssistantDisplayBlock(
block: unknown,
): TelegramAssistantDisplayBlock | undefined {
if (typeof block !== "object" || block === null || !("type" in block)) {
return undefined;
}
const candidate = block as Record<string, unknown>;
if (candidate.type === "text" && typeof candidate.text === "string") {
return { type: "text", text: candidate.text };
}
if (candidate.type === "thinking") {
const text =
typeof candidate.text === "string"
? candidate.text
: typeof candidate.thinking === "string"
? candidate.thinking
: undefined;
if (!text) return undefined;
return { type: "thinking", text };
}
if (candidate.type === "tool_call" || candidate.type === "tool_use" || candidate.type === "toolCall") {
const name =
typeof candidate.name === "string"
? candidate.name
: typeof candidate.tool === "string"
? candidate.tool
: undefined;
if (!name) return undefined;
return {
type: "tool_call",
name,
argsText: stringifyToolArgs(
"input" in candidate
? candidate.input
: "arguments" in candidate
? candidate.arguments
: "args" in candidate
? candidate.args
: undefined,
),
};
}
const fallback = stringifyTraceFallback(candidate);
if (fallback) {
return {
type: "unknown",
label: String(candidate.type),
text: fallback,
};
}
return undefined;
}
function getToolResultFullOutputPath(details: unknown): string | undefined {
if (typeof details !== "object" || details === null) return undefined;
const path = (details as Record<string, unknown>).fullOutputPath;
return typeof path === "string" && path.trim() ? path : undefined;
}
async function readToolResultFullOutput(details: unknown): Promise<string | undefined> {
const path = getToolResultFullOutputPath(details);
if (!path) return undefined;
return readFile(path, "utf8").catch(() => undefined);
}
async function extractToolResultBlock(
message: Record<string, unknown>,
): Promise<TelegramAssistantDisplayBlock | undefined> {
if (message.role !== "toolResult") return undefined;
const toolName =
typeof message.toolName === "string" ? message.toolName : undefined;
const text = extractTextBlocksContent(message.content);
const fullOutput = await readToolResultFullOutput(message.details);
const detailsText = stringifyTraceFallback(message.details);
const fallbackText = stringifyTraceFallback({
content: message.content,
details: message.details,
});
const outputText = fullOutput?.trimEnd() || text || fallbackText;
if (!outputText && !detailsText) return undefined;
return {
type: "tool_result",
toolName,
text: outputText ?? "",
detailsText,
isError: message.isError === true,
};
}
function extractAssistantDisplayBlocks(
content: unknown,
): TelegramAssistantDisplayBlock[] {
const blocks = Array.isArray(content) ? content : [];
return blocks
.map(normalizeAssistantDisplayBlock)
.filter((block): block is TelegramAssistantDisplayBlock => !!block);
}
function extractTextContent(content: unknown): string {
return extractAssistantDisplayBlocks(content)
.filter(
(block): block is Extract<TelegramAssistantDisplayBlock, { type: "text" }> =>
block.type === "text",
)
.map((block) => block.text)
.join("")
.trim();
}
function getMessageText(message: AgentMessage): string {
return extractTextContent(
(message as unknown as Record<string, unknown>).content,
);
}
function getMessageBlocks(message: AgentMessage): TelegramAssistantDisplayBlock[] {
return extractAssistantDisplayBlocks(
(message as unknown as Record<string, unknown>).content,
);
}
async function extractAssistantTurn(messages: AgentMessage[]): Promise<{
blocks: TelegramAssistantDisplayBlock[];
text?: string;
stopReason?: string;
errorMessage?: string;
}> {
const blocks: TelegramAssistantDisplayBlock[] = [];
let text: string | undefined;
let stopReason: string | undefined;
let errorMessage: string | undefined;
for (const next of messages) {
const message = next as unknown as Record<string, unknown>;
const toolResultBlock = await extractToolResultBlock(message);
if (toolResultBlock) {
blocks.push(toolResultBlock);
continue;
}
if (message.role !== "assistant") continue;
const nextBlocks = extractAssistantDisplayBlocks(message.content);
blocks.push(...nextBlocks);
const nextText = extractTextContent(message.content);
if (nextText) {
text = nextText;
}
stopReason =
typeof message.stopReason === "string" ? message.stopReason : stopReason;
errorMessage =
typeof message.errorMessage === "string"
? message.errorMessage
: errorMessage;
}
return { blocks, text, stopReason, errorMessage };
}
async function refreshOpenStatusMenus(ctx: ExtensionContext): Promise<void> {
for (const state of modelMenus.values()) {
if (state.mode !== "status") continue;
await showStatusMessage(state, ctx);
}
}
function setDisplayMode(mode: DisplayMode, ctx: ExtensionContext): void {
displayMode = mode;
updateStatus(ctx);
void refreshOpenStatusMenus(ctx);
}
function createPreviewState(): TelegramPreviewState {
return {
mode: draftSupport === "unsupported" ? "message" : "draft",
pendingText: "",
lastSentText: "",
};
}
function isTelegramMessageNotModifiedError(error: unknown): boolean {
return (
error instanceof Error &&
error.message.includes("message is not modified")
);
}
async function editTelegramMessageText(
body: Record<string, unknown>,
): Promise<"edited" | "unchanged"> {
try {
await callTelegramApi("editMessageText", body);
return "edited";
} catch (error) {
if (isTelegramMessageNotModifiedError(error)) return "unchanged";
throw error;
}
}
const replyTransport = buildTelegramReplyTransport<TelegramReplyMarkup>({
sendMessage: async (body) => {
return callTelegramApi<TelegramSentMessage>("sendMessage", body);
},
editMessage: async (body) => {
await editTelegramMessageText(body);
},
});
function getPreviewRuntimeDeps() {
return {
getState: () => previewState,
setState: (state: TelegramPreviewState | undefined) => {
previewState = state;
},
clearScheduledFlush: (state: TelegramPreviewState) => {
if (!state.flushTimer) return;
clearTimeout(state.flushTimer);
state.flushTimer = undefined;
},
maxMessageLength: MAX_MESSAGE_LENGTH,
renderPreviewText: renderMarkdownPreviewText,
getDraftSupport: () => draftSupport,
setDraftSupport: (support: "unknown" | "supported" | "unsupported") => {
draftSupport = support;
},
allocateDraftId,
sendDraft: async (chatId: number, draftId: number, text: string) => {
await callTelegramApi("sendMessageDraft", {
chat_id: chatId,
draft_id: draftId,
text,
});
},
sendMessage: async (chatId: number, text: string) => {
return callTelegramApi<TelegramSentMessage>("sendMessage", {
chat_id: chatId,
text,
});
},
editMessageText: async (
chatId: number,
messageId: number,
text: string,
) => {
await editTelegramMessageText({
chat_id: chatId,
message_id: messageId,
text,
});
},
renderTelegramMessage,
sendRenderedChunks: replyTransport.sendRenderedChunks,
editRenderedMessage: replyTransport.editRenderedMessage,
};
}
async function clearPreview(chatId: number): Promise<void> {
await clearTelegramPreview(chatId, getPreviewRuntimeDeps());
}
async function flushPreview(chatId: number): Promise<void> {
await flushTelegramPreview(chatId, getPreviewRuntimeDeps());
}
function schedulePreviewFlush(chatId: number): void {
if (!previewState || previewState.flushTimer) return;
previewState.flushTimer = setTimeout(() => {
flushPreview(chatId).catch((err) =>
console.warn("[pi-telegram] flushPreview failed:", err),
);
}, PREVIEW_THROTTLE_MS);
}
async function finalizePreview(chatId: number): Promise<boolean> {
return finalizeTelegramPreview(chatId, getPreviewRuntimeDeps());
}
async function finalizeMarkdownPreview(
chatId: number,
markdown: string,
): Promise<boolean> {
return finalizeTelegramMarkdownPreview(
chatId,
markdown,
getPreviewRuntimeDeps(),
);
}
async function sendTextReply(
chatId: number,
_replyToMessageId: number,
text: string,
options?: { parseMode?: "HTML" },
): Promise<number | undefined> {
return sendTelegramPlainReply(
text,
{
renderTelegramMessage,
sendRenderedChunks: async (chunks) =>
replyTransport.sendRenderedChunks(chatId, chunks),
},
options,
);
}
async function sendMarkdownReply(
chatId: number,
replyToMessageId: number,
markdown: string,
): Promise<number | undefined> {
return sendTelegramMarkdownReply(markdown, {
renderTelegramMessage,
sendRenderedChunks: async (chunks) => {
if (chunks.length === 0) {
return sendTextReply(chatId, replyToMessageId, markdown);
}
return replyTransport.sendRenderedChunks(chatId, chunks);
},
});
}
async function sendQueuedAttachments(
turn: ActiveTelegramTurn,
): Promise<void> {
await sendQueuedTelegramAttachments(turn, {
sendMultipart: async (method, fields, fileField, filePath, fileName) => {
await callTelegramMultipartApi<TelegramSentMessage>(
method,
fields,
fileField,
filePath,
fileName,
);
},
sendTextReply,
});
}
async function extractAssistantSummary(messages: AgentMessage[]): Promise<{
blocks: TelegramAssistantDisplayBlock[];
text?: string;
stopReason?: string;
errorMessage?: string;
}> {
return extractAssistantTurn(messages);
}
// --- Bridge Setup ---
async function promptForConfig(ctx: ExtensionContext): Promise<void> {
if (!ctx.hasUI || setupInProgress) return;
setupInProgress = true;
try {
const tokenPrompt = getTelegramBotTokenPromptSpec(
process.env,
config.botToken,
);
// Use the editor when a real default exists because ctx.ui.input only
// exposes placeholder text, not an editable prefilled value.
const token =
tokenPrompt.method === "editor"
? await ctx.ui.editor("Telegram bot token", tokenPrompt.value)
: await ctx.ui.input("Telegram bot token", tokenPrompt.value);
if (!token) return;
const nextConfig: TelegramConfig = { ...config, botToken: token.trim() };
const response = await fetch(
`https://api.telegram.org/bot${nextConfig.botToken}/getMe`,
);
const data = (await response.json()) as TelegramApiResponse<TelegramUser>;
if (!data.ok || !data.result) {
ctx.ui.notify(
data.description || "Invalid Telegram bot token",
"error",
);
return;
}
nextConfig.botId = data.result.id;
nextConfig.botUsername = data.result.username;
config = nextConfig;
await writeTelegramConfig(AGENT_DIR, CONFIG_PATH, config);
ctx.ui.notify(
`Telegram bot connected: @${config.botUsername ?? "unknown"}`,
"info",
);
if (!config.allowedUserId) {
ctx.ui.notify(
"Enter your numeric Telegram user ID. To find it: DM @userinfobot on Telegram — it replies with your ID. This is NOT your @username or phone number.",
"info",
);
const rawUserId = await ctx.ui.input(
"Allowed Telegram user ID (numeric, e.g. 123456789)",
"",
);
if (!rawUserId) return;
const parsedUserId = Number(rawUserId.trim());
if (!Number.isInteger(parsedUserId) || parsedUserId <= 0) {
ctx.ui.notify(
`"${rawUserId}" is not a valid Telegram user ID. Must be a positive integer.`,
"error",
);
return;
}
config.allowedUserId = parsedUserId;
await writeTelegramConfig(AGENT_DIR, CONFIG_PATH, config);
ctx.ui.notify(
`Allowed user ID set to ${config.allowedUserId}. Send a message from that account to confirm.`,
"info",
);
}
await startPolling(ctx);
updateStatus(ctx);
} finally {
setupInProgress = false;
}
}
async function registerTelegramBotCommands(): Promise<void> {
const commands = buildTelegramBotCommands(pi.getCommands());
await callTelegramApi<boolean>("setMyCommands", { commands });
}
function getCurrentTelegramModel(
ctx: ExtensionContext,
): Model<any> | undefined {
return currentTelegramModel ?? ctx.model;
}
// --- Interactive Menu State & Builders ---
async function getModelMenuState(
chatId: number,
args: string | undefined,
ctx: ExtensionContext,
): Promise<TelegramModelMenuState> {
const { SettingsManager } = await import("@mariozechner/pi-coding-agent");
const settingsManager = SettingsManager.create(ctx.cwd);
await settingsManager.reload();
ctx.modelRegistry.refresh();
const activeModel = getCurrentTelegramModel(ctx);
const availableModels = ctx.modelRegistry.getAvailable();
const cliScopedModels = getCliScopedModelPatterns();
const configuredScopedModels =
cliScopedModels ?? settingsManager.getEnabledModels() ?? [];
return buildTelegramModelMenuState({
chatId,
activeModel,
availableModels,
configuredScopedModelPatterns: configuredScopedModels,
cliScopedModelPatterns: cliScopedModels ?? undefined,
filterQuery: args,
});
}
// --- Interactive Menu Actions ---
async function updateModelMenuMessage(
state: TelegramModelMenuState,
ctx: ExtensionContext,
): Promise<void> {
await updateTelegramModelMenuMessage(state, getCurrentTelegramModel(ctx), {
editInteractiveMessage,
sendInteractiveMessage,
});
}
async function updateThinkingMenuMessage(
state: TelegramModelMenuState,
ctx: ExtensionContext,
): Promise<void> {
await updateTelegramThinkingMenuMessage(
state,
getCurrentTelegramModel(ctx),
pi.getThinkingLevel(),
{ editInteractiveMessage, sendInteractiveMessage },
);
}
async function editInteractiveMessage(
chatId: number,
messageId: number,
text: string,
mode: TelegramRenderMode,
replyMarkup: TelegramReplyMarkup,
): Promise<void> {
await replyTransport.editRenderedMessage(
chatId,
messageId,
renderTelegramMessage(text, { mode }),
{ replyMarkup },
);
}
async function sendInteractiveMessage(
chatId: number,
text: string,
mode: TelegramRenderMode,
replyMarkup: TelegramReplyMarkup,
): Promise<number | undefined> {
return replyTransport.sendRenderedChunks(
chatId,
renderTelegramMessage(text, { mode }),
{ replyMarkup },
);
}
async function ensureIdleOrNotify(
ctx: ExtensionContext,
chatId: number,
replyToMessageId: number,
busyMessage: string,
): Promise<boolean> {
if (ctx.isIdle()) return true;
await sendTextReply(chatId, replyToMessageId, busyMessage);
return false;
}
async function showStatusMessage(
state: TelegramModelMenuState,
ctx: ExtensionContext,
): Promise<void> {
await updateTelegramStatusMessage(
state,
buildStatusHtml(ctx, getCurrentTelegramModel(ctx), displayMode !== "text"),
getCurrentTelegramModel(ctx),
pi.getThinkingLevel(),
displayMode !== "text",
{ editInteractiveMessage, sendInteractiveMessage },
);
}
async function sendStatusMessage(
chatId: number,
replyToMessageId: number,
ctx: ExtensionContext,
): Promise<void> {
const isIdle = await ensureIdleOrNotify(
ctx,
chatId,
replyToMessageId,
"Cannot open status while pi is busy. Send /stop first.",
);
if (!isIdle) return;
const state = await getModelMenuState(chatId, undefined, ctx);
const messageId = await sendTelegramStatusMessage(
state,
buildStatusHtml(ctx, getCurrentTelegramModel(ctx), displayMode !== "text"),
getCurrentTelegramModel(ctx),
pi.getThinkingLevel(),
displayMode !== "text",
{ editInteractiveMessage, sendInteractiveMessage },
);
if (messageId === undefined) return;
state.messageId = messageId;
state.mode = "status";
modelMenus.set(messageId, state);
}
function canOfferInFlightTelegramModelSwitch(ctx: ExtensionContext): boolean {
return canRestartTelegramTurnForModelSwitch({
isIdle: ctx.isIdle(),
hasActiveTelegramTurn: !!activeTelegramTurn,
hasAbortHandler: !!currentAbort,
});
}
function createTelegramControlItem(
chatId: number,
replyToMessageId: number,
controlType: PendingTelegramControlItem["controlType"],
statusSummary: string,
execute: PendingTelegramControlItem["execute"],
): PendingTelegramControlItem {
const queueOrder = nextQueuedTelegramItemOrder++;
return {
kind: "control",
controlType,
chatId,
replyToMessageId,
queueOrder,
queueLane: "control",
laneOrder: nextQueuedTelegramControlOrder++,
statusSummary,
execute,
};
}
function enqueueTelegramControlItem(
item: PendingTelegramControlItem,
ctx: ExtensionContext,
): void {
queuedTelegramItems.push(item);
reorderQueuedTelegramTurns(ctx);
dispatchNextQueuedTelegramTurn(ctx);
}
function createTelegramModelSwitchContinuationTurn(
turn: ActiveTelegramTurn,
selection: ScopedTelegramModel,
): PendingTelegramTurn {
const statusLabel = truncateTelegramQueueSummary(
`continue on ${selection.model.id}`,
4,
32,
);
return {
kind: "prompt",
chatId: turn.chatId,
replyToMessageId: turn.replyToMessageId,
sourceMessageIds: [],
queueOrder: nextQueuedTelegramItemOrder++,
queueLane: "control",
laneOrder: nextQueuedTelegramControlOrder++,
queuedAttachments: [],
content: [
{
type: "text",
text: buildTelegramModelSwitchContinuationText(
TELEGRAM_PREFIX,
selection.model,
selection.thinkingLevel,
),
},
],
historyText: `Continue interrupted Telegram request on ${getCanonicalModelId(selection.model)}`,
statusSummary: `↻ ${statusLabel || "continue"}`,
};
}
function queueTelegramModelSwitchContinuation(
turn: ActiveTelegramTurn,
selection: ScopedTelegramModel,
ctx: ExtensionContext,
): void {
queuedTelegramItems.push(
createTelegramModelSwitchContinuationTurn(turn, selection),
);
reorderQueuedTelegramTurns(ctx);
}
function triggerPendingTelegramModelSwitchAbort(
ctx: ExtensionContext,
): boolean {
if (
!shouldTriggerPendingTelegramModelSwitchAbort({
hasPendingModelSwitch: !!pendingTelegramModelSwitch,
hasActiveTelegramTurn: !!activeTelegramTurn,
hasAbortHandler: !!currentAbort,
activeToolExecutions: activeTelegramToolExecutions,
})
) {
return false;
}
const selection = pendingTelegramModelSwitch;
const turn = activeTelegramTurn;
const abort = currentAbort;
if (!selection || !turn || !abort) return false;
pendingTelegramModelSwitch = undefined;
queueTelegramModelSwitchContinuation(turn, selection, ctx);
markTelegramAbortRequested();
abort();
return true;
}
async function openModelMenu(
chatId: number,
replyToMessageId: number,
args: string | undefined,
ctx: ExtensionContext,
): Promise<void> {
if (!ctx.isIdle() && !canOfferInFlightTelegramModelSwitch(ctx)) {
await sendTextReply(
chatId,
replyToMessageId,
"Cannot switch model while pi is busy. Send /stop first.",
);
return;
}
const state = await getModelMenuState(chatId, args, ctx);
if (state.allModels.length === 0) {
await sendTextReply(
chatId,
replyToMessageId,
"No available models with configured auth.",
);
return;
}
const activeModel = getCurrentTelegramModel(ctx);
const messageId = await sendTelegramModelMenuMessage(state, activeModel, {
editInteractiveMessage,
sendInteractiveMessage,
});
if (messageId === undefined) return;
state.messageId = messageId;
state.mode = "model";
modelMenus.set(messageId, state);
}
async function handleStatusCallbackAction(
query: TelegramCallbackQuery,
state: TelegramModelMenuState,
ctx: ExtensionContext,
): Promise<boolean> {
return handleTelegramStatusMenuCallbackAction(
query.id,
query.data,
getCurrentTelegramModel(ctx),
{
updateModelMenuMessage: async () => updateModelMenuMessage(state, ctx),
updateThinkingMenuMessage: async () =>
updateThinkingMenuMessage(state, ctx),
updateStatusMessage: async () => showStatusMessage(state, ctx),
setTraceVisible: (v) => setDisplayMode(v ? "compact" : "text", ctx),
getTraceVisible: () => displayMode !== "text",
answerCallbackQuery,
},
);
}
async function handleThinkingCallbackAction(
query: TelegramCallbackQuery,
state: TelegramModelMenuState,
ctx: ExtensionContext,
): Promise<boolean> {
return handleTelegramThinkingMenuCallbackAction(
query.id,
query.data,
getCurrentTelegramModel(ctx),
{
setThinkingLevel: (level) => {
pi.setThinkingLevel(level);
updateStatus(ctx);
},
getCurrentThinkingLevel: () => pi.getThinkingLevel(),
updateStatusMessage: async () => showStatusMessage(state, ctx),
answerCallbackQuery,
},
);
}
async function handleModelCallbackAction(
query: TelegramCallbackQuery,
state: TelegramModelMenuState,
ctx: ExtensionContext,
): Promise<boolean> {
try {
return await handleTelegramModelMenuCallbackAction(
query.id,
{
data: query.data,
state,
activeModel: getCurrentTelegramModel(ctx),
currentThinkingLevel: pi.getThinkingLevel(),
isIdle: ctx.isIdle(),
canRestartBusyRun: !!activeTelegramTurn && !!currentAbort,
hasActiveToolExecutions: activeTelegramToolExecutions > 0,
},
{
updateModelMenuMessage: async () =>
updateModelMenuMessage(state, ctx),
updateStatusMessage: async () => showStatusMessage(state, ctx),
answerCallbackQuery,
setModel: (model) => pi.setModel(model),
setCurrentModel: (model) => {
currentTelegramModel = model;
updateStatus(ctx);
},
setThinkingLevel: (level) => {
pi.setThinkingLevel(level);
updateStatus(ctx);
},
stagePendingModelSwitch: (selection) => {
pendingTelegramModelSwitch = selection;
updateStatus(ctx);
},
restartInterruptedTelegramTurn: (selection) => {
return restartTelegramModelSwitchContinuation({
activeTurn: activeTelegramTurn,
abort: currentAbort,
selection,
queueContinuation: (turn, nextSelection) => {
queueTelegramModelSwitchContinuation(turn, nextSelection, ctx);
},
});
},
},
);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await answerCallbackQuery(query.id, message);
return true;
}
}
async function handleAuthorizedTelegramCallbackQuery(
query: TelegramCallbackQuery,
ctx: ExtensionContext,
): Promise<void> {
const messageId = query.message?.message_id;
await handleTelegramMenuCallbackEntry(
query.id,
query.data,
messageId ? modelMenus.get(messageId) : undefined,
{
handleStatusAction: async () => {
const state = messageId ? modelMenus.get(messageId) : undefined;
if (!state) return false;
return handleStatusCallbackAction(query, state, ctx);
},
handleThinkingAction: async () => {
const state = messageId ? modelMenus.get(messageId) : undefined;
if (!state) return false;
return handleThinkingCallbackAction(query, state, ctx);
},
handleModelAction: async () => {
const state = messageId ? modelMenus.get(messageId) : undefined;
if (!state) return false;
return handleModelCallbackAction(query, state, ctx);
},
answerCallbackQuery,
},
);
}
// --- Status Rendering ---
// --- Turn Queue & Message Dispatch ---
async function buildTelegramFiles(
messages: TelegramMessage[],
): Promise<DownloadedTelegramFile[]> {
const downloaded: DownloadedTelegramFile[] = [];
for (const file of collectTelegramFileInfos(messages)) {
const path = await downloadTelegramBridgeFile(
file.file_id,
file.fileName,
);
downloaded.push({
path,
fileName: file.fileName,
isImage: file.isImage,
mimeType: file.mimeType,
});
}
return downloaded;
}
function reorderQueuedTelegramTurns(ctx: ExtensionContext): void {
queuedTelegramItems.sort(compareTelegramQueueItems);
updateStatus(ctx);
}
function removePendingMediaGroupMessages(messageIds: number[]): void {
if (messageIds.length === 0 || mediaGroups.size === 0) return;
const deletedMessageIds = new Set(messageIds);
for (const [key, state] of mediaGroups.entries()) {
if (
!state.messages.some((message) =>
deletedMessageIds.has(message.message_id),
)
) {
continue;
}
if (state.flushTimer) clearTimeout(state.flushTimer);
mediaGroups.delete(key);
}
}
function removeQueuedTelegramTurnsByMessageIds(
messageIds: number[],
ctx: ExtensionContext,
): number {
const result = removeTelegramQueueItemsByMessageIds(
queuedTelegramItems,
messageIds,
);
if (result.removedCount === 0) return 0;
queuedTelegramItems = result.items;
updateStatus(ctx);
return result.removedCount;
}
function clearQueuedTelegramTurnPriorityByMessageId(
messageId: number,
ctx: ExtensionContext,
): boolean {
const result = clearTelegramQueuePromptPriority(
queuedTelegramItems,
messageId,
);
if (!result.changed) return false;
queuedTelegramItems = result.items;
reorderQueuedTelegramTurns(ctx);
return true;
}
function prioritizeQueuedTelegramTurnByMessageId(
messageId: number,
ctx: ExtensionContext,
): boolean {
const result = prioritizeTelegramQueuePrompt(
queuedTelegramItems,
messageId,
nextPriorityReactionOrder,
);
if (!result.changed) return false;
queuedTelegramItems = result.items;
nextPriorityReactionOrder += 1;
reorderQueuedTelegramTurns(ctx);
return true;
}
async function handleAuthorizedTelegramReactionUpdate(
reactionUpdate: TelegramMessageReactionUpdated,
ctx: ExtensionContext,
): Promise<void> {
const reactionUser = reactionUpdate.user;
if (
reactionUpdate.chat.type !== "private" ||
!reactionUser ||
reactionUser.is_bot ||
reactionUser.id !== config.allowedUserId
) {
return;
}
const oldEmojis = collectTelegramReactionEmojis(
reactionUpdate.old_reaction,
);
const newEmojis = collectTelegramReactionEmojis(
reactionUpdate.new_reaction,
);
const dislikeAdded = !oldEmojis.has("👎") && newEmojis.has("👎");
if (dislikeAdded) {
removePendingMediaGroupMessages([reactionUpdate.message_id]);
removeQueuedTelegramTurnsByMessageIds([reactionUpdate.message_id], ctx);
return;
}
const likeRemoved = oldEmojis.has("👍") && !newEmojis.has("👍");
if (likeRemoved) {
clearQueuedTelegramTurnPriorityByMessageId(
reactionUpdate.message_id,
ctx,
);
}
const likeAdded = !oldEmojis.has("👍") && newEmojis.has("👍");
if (!likeAdded) return;
prioritizeQueuedTelegramTurnByMessageId(reactionUpdate.message_id, ctx);
}
async function createTelegramTurn(
messages: TelegramMessage[],
historyTurns: PendingTelegramTurn[] = [],
): Promise<PendingTelegramTurn> {
return buildTelegramPromptTurn({
telegramPrefix: TELEGRAM_PREFIX,
messages,
historyTurns,
queueOrder: nextQueuedTelegramItemOrder++,
rawText: extractTelegramMessagesText(messages),
files: await buildTelegramFiles(messages),
readBinaryFile: async (path) => readFile(path),
inferImageMimeType: guessMediaType,
});
}
async function handleStopCommand(
message: TelegramMessage,
ctx: ExtensionContext,
): Promise<void> {
if (currentAbort) {
pendingTelegramModelSwitch = undefined;
if (queuedTelegramItems.length > 0) {
preserveQueuedTurnsAsHistory = true;
}
markTelegramAbortRequested();
currentAbort();
updateStatus(ctx);
await sendTextReply(
message.chat.id,
message.message_id,
"Aborted current turn.",
);
return;
}
await sendTextReply(message.chat.id, message.message_id, "No active turn.");
}
async function handleQuitCommand(
message: TelegramMessage,
ctx: ExtensionContext,
): Promise<void> {
await sendTextReply(message.chat.id, message.message_id, "Shutting down pi session.");
ctx.shutdown();
}
async function handleShellCommand(
shellCmd: string,
message: TelegramMessage,
_ctx: ExtensionContext,
): Promise<void> {
try {
const result = await pi.exec("sh", ["-c", shellCmd], { timeout: 30_000 });
await sendMarkdownReply(
message.chat.id,
message.message_id,
buildShellCommandReply({
shellCmd,
stdout: result.stdout,
stderr: result.stderr,
exitCode: result.code,
}),
);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await sendTextReply(message.chat.id, message.message_id, `Shell error: ${msg}`);
}
}
async function handleCompactCommand(
message: TelegramMessage,
ctx: ExtensionContext,
): Promise<void> {
if (
!ctx.isIdle() ||
ctx.hasPendingMessages() ||
activeTelegramTurn ||
telegramTurnDispatchPending ||
queuedTelegramItems.length > 0 ||
compactionInProgress
) {
await sendTextReply(
message.chat.id,
message.message_id,
"Cannot compact while pi or the Telegram queue is busy. Wait for queued turns to finish or send /stop first.",
);
return;
}
compactionInProgress = true;
updateStatus(ctx);
try {
ctx.compact({
onComplete: () => {
compactionInProgress = false;
updateStatus(ctx);
dispatchNextQueuedTelegramTurn(ctx);
void sendTextReply(
message.chat.id,
message.message_id,
"Compaction completed.",
);
},
onError: (error) => {
compactionInProgress = false;
updateStatus(ctx);
dispatchNextQueuedTelegramTurn(ctx);
const errorMessage =
error instanceof Error ? error.message : String(error);
void sendTextReply(
message.chat.id,
message.message_id,
`Compaction failed: ${errorMessage}`,
);
},
});
} catch (error) {
compactionInProgress = false;
updateStatus(ctx);
const errorMessage =
error instanceof Error ? error.message : String(error);
await sendTextReply(
message.chat.id,
message.message_id,
`Compaction failed: ${errorMessage}`,
);
return;
}
await sendTextReply(
message.chat.id,
message.message_id,
"Compaction started.",
);
}
async function handleStatusCommand(
message: TelegramMessage,
ctx: ExtensionContext,
): Promise<void> {
enqueueTelegramControlItem(
createTelegramControlItem(
message.chat.id,
message.message_id,
"status",
"⚡ status",
async (controlCtx) => {
await sendStatusMessage(
message.chat.id,
message.message_id,
controlCtx,
);
},
),
ctx,
);
}
async function handleModelCommand(
message: TelegramMessage,
args: string | undefined,
ctx: ExtensionContext,
): Promise<void> {
enqueueTelegramControlItem(
createTelegramControlItem(
message.chat.id,
message.message_id,
"model",
"⚡ model",
async (controlCtx) => {
await openModelMenu(
message.chat.id,
message.message_id,
args,
controlCtx,
);
},
),
ctx,
);
}
async function handleTraceCommand(
message: TelegramMessage,
ctx: ExtensionContext,
): Promise<void> {
const modes: DisplayMode[] = ["text", "compact", "full"];
const nextMode = modes[(modes.indexOf(displayMode) + 1) % modes.length]!;
setDisplayMode(nextMode, ctx);
await sendTextReply(message.chat.id, message.message_id, `Display mode: ${nextMode}.`);
}
async function handleHelpCommand(
message: TelegramMessage,
_commandName: string,
_ctx: ExtensionContext,
): Promise<void> {
// Always refresh the Telegram bot menu so late-registered pi commands
// (skills/prompts/extensions that registered after the initial /start)
// become discoverable. /help is the user-facing escape hatch when the
// menu drifts.
let menuWarning = "";
try {
await registerTelegramBotCommands();
} catch (error) {
const msg = error instanceof Error ? error.message : String(error);
menuWarning = `\n\nWarning: failed to refresh bot commands menu: ${msg}`;
}
const piCommands = pi.getCommands();
const lines: string[] = [
"Send a message to forward to pi. ! prefix runs a shell command.",
"",
"Bridge: /status /trace /model /compact /stop /quit /start /help",
"",
`Pi commands (${piCommands.length}):`,
];
if (piCommands.length === 0) {
lines.push(" (none registered)");
} else {
const sorted = [...piCommands].sort((a, b) =>
a.name.localeCompare(b.name),
);
for (const c of sorted) {
const desc = c.description ? ` — ${c.description}` : "";
lines.push(`/${c.name}${desc}`);
}
}
lines.push("", "/help re-syncs Telegram's command menu.");
await sendTextReply(
message.chat.id,
message.message_id,
lines.join("\n") + menuWarning,
);
}
async function handleTelegramCommand(
commandName: string | undefined,
args: string | undefined,
message: TelegramMessage,
ctx: ExtensionContext,
): Promise<boolean> {
if (!commandName) return false;
const handlers: Partial<Record<string, () => Promise<void>>> = {
stop: () => handleStopCommand(message, ctx),
compact: () => handleCompactCommand(message, ctx),
status: () => handleStatusCommand(message, ctx),
trace: () => handleTraceCommand(message, ctx),
model: () => handleModelCommand(message, args, ctx),
help: () => handleHelpCommand(message, commandName, ctx),
start: () => handleHelpCommand(message, commandName, ctx),
quit: () => handleQuitCommand(message, ctx),
exit: () => handleQuitCommand(message, ctx),
};
const handler = handlers[commandName];
if (!handler) return false;
await handler();
return true;
}
async function enqueueTelegramTurn(
messages: TelegramMessage[],
ctx: ExtensionContext,
): Promise<void> {
const historyResult = preserveQueuedTurnsAsHistory
? partitionTelegramQueueItemsForHistory(queuedTelegramItems)
: { historyTurns: [], remainingItems: queuedTelegramItems };
queuedTelegramItems = historyResult.remainingItems;
preserveQueuedTurnsAsHistory = false;
const turn = await createTelegramTurn(messages, historyResult.historyTurns);
queuedTelegramItems.push(turn);
updateStatus(ctx);
dispatchNextQueuedTelegramTurn(ctx);
}
async function dispatchAuthorizedTelegramMessages(
messages: TelegramMessage[],
ctx: ExtensionContext,
): Promise<void> {
const firstMessage = messages[0];
if (!firstMessage) return;
if (shouldRecoverStaleTelegramAbort(ctx)) {
await recoverStaleTelegramAbort(ctx);
}
const rawText = extractFirstTelegramMessageText(messages);
// Handle ! shell commands directly via ctx.exec
const trimmedRaw = rawText.trimStart();
if (trimmedRaw.startsWith("!")) {
const shellCmd = trimmedRaw.slice(1).trim();
if (shellCmd) {
await handleShellCommand(shellCmd, firstMessage, ctx);
return;
}
}
const command = parseTelegramCommand(rawText);
const handled = await handleTelegramCommand(
command?.name,
command?.args,
firstMessage,
ctx,
);
if (handled) return;
await enqueueTelegramTurn(messages, ctx);
}
async function handleAuthorizedTelegramMessage(
message: TelegramMessage,
ctx: ExtensionContext,
): Promise<void> {
if (message.media_group_id) {
const key = `${message.chat.id}:${message.media_group_id}`;
const existing = mediaGroups.get(key) ?? { messages: [] };
existing.messages.push(message);
if (existing.flushTimer) clearTimeout(existing.flushTimer);
existing.flushTimer = setTimeout(() => {
const state = mediaGroups.get(key);
mediaGroups.delete(key);
if (!state) return;
dispatchAuthorizedTelegramMessages(state.messages, ctx).catch((err) =>
console.warn(
"[pi-telegram] dispatchAuthorizedTelegramMessages failed:",
err,
),
);
}, TELEGRAM_MEDIA_GROUP_DEBOUNCE_MS);
mediaGroups.set(key, existing);
return;
}
await dispatchAuthorizedTelegramMessages([message], ctx);
}
async function handleUpdate(
update: TelegramUpdate,
ctx: ExtensionContext,
): Promise<void> {
await executeTelegramUpdate(update, config.allowedUserId, {
ctx,
removePendingMediaGroupMessages,
removeQueuedTelegramTurnsByMessageIds,
handleAuthorizedTelegramReactionUpdate: async (
reactionUpdate,
nextCtx,
) => {
await handleAuthorizedTelegramReactionUpdate(
reactionUpdate as TelegramMessageReactionUpdated,
nextCtx,
);
},
onDeniedUserId: (userId) => {
ctx.ui.notify(
`Telegram: rejected message from user ID ${userId} (not the configured allowed user). To allow this user, set TELEGRAM_ALLOWED_USER_ID=${userId}.`,
"warning",
);
},
answerCallbackQuery,
handleAuthorizedTelegramCallbackQuery: async (query, nextCtx) => {
await handleAuthorizedTelegramCallbackQuery(
query as TelegramCallbackQuery,
nextCtx,
);
},
sendTextReply,
handleAuthorizedTelegramMessage: async (message, nextCtx) => {
await handleAuthorizedTelegramMessage(
message as TelegramMessage,
nextCtx,
);
},
});
}
// --- Polling ---
async function stopPolling(): Promise<void> {
stopTypingLoop();
pollingController?.abort();
pollingController = undefined;
await pollingPromise?.catch(() => undefined);
pollingPromise = undefined;
}
async function pollLoop(
ctx: ExtensionContext,
signal: AbortSignal,
): Promise<void> {
await runTelegramPollLoop<TelegramUpdate>({
ctx,
signal,
config,
deleteWebhook: async (pollSignal) => {
await callTelegramApi(
"deleteWebhook",
{ drop_pending_updates: false },
{ signal: pollSignal },
);
},
getUpdates: async (body, pollSignal) => {
return callTelegramApi<TelegramUpdate[]>("getUpdates", body, {
signal: pollSignal,
});
},
persistConfig: async () => {
await writeTelegramConfig(AGENT_DIR, CONFIG_PATH, config);
},
handleUpdate: async (update, loopCtx) => {
await handleUpdate(update, loopCtx);
},
onErrorStatus: (message) => {
updateStatus(ctx, message);
},
onStatusReset: () => {
updateStatus(ctx);
},
sleep: (ms) => new Promise((resolve) => setTimeout(resolve, ms)),
});
}
async function startPolling(ctx: ExtensionContext): Promise<void> {
if (
!shouldStartTelegramPolling({
hasBotToken: !!config.botToken,
hasPollingPromise: !!pollingPromise,
})
) {
return;
}
if (!config.allowedUserId) {
ctx.ui.notify(
"Telegram polling blocked: allowedUserId is not set. Set TELEGRAM_ALLOWED_USER_ID or run /telegram-setup to configure it.",
"warning",
);
return;
}
pollingController = new AbortController();
pollingPromise = pollLoop(ctx, pollingController.signal).finally(() => {
pollingPromise = undefined;
pollingController = undefined;
updateStatus(ctx);
});
updateStatus(ctx);
}
// --- Extension Registration ---
registerTelegramAttachmentTool(pi, {
maxAttachmentsPerTurn: MAX_ATTACHMENTS_PER_TURN,
getActiveTurn: () => activeTelegramTurn,
statPath: stat,
});
registerTelegramCommands(pi, {
promptForConfig,
getStatusLines: () => {
return [
`bot: ${config.botUsername ? `@${config.botUsername}` : "not configured"}`,
`allowed user: ${config.allowedUserId ?? "not configured"}`,
`polling: ${pollingPromise ? "running" : "stopped"}`,
`active telegram turn: ${activeTelegramTurn ? "yes" : "no"}`,
`queued telegram turns: ${queuedTelegramItems.length}`,
];
},
reloadConfig: async () => {
config = await readTelegramConfig(CONFIG_PATH);
},
hasBotToken: () => !!config.botToken,
startPolling,
stopPolling,
updateStatus,
});
// --- Lifecycle Hooks ---
registerTelegramLifecycleHooks(pi, {
onSessionStart: async (_event, ctx) => {
config = await readTelegramConfig(CONFIG_PATH);
const envAllowedUserId = readAllowedUserIdFromEnv(process.env);
if (envAllowedUserId !== undefined && envAllowedUserId !== config.allowedUserId) {
config.allowedUserId = envAllowedUserId;
await writeTelegramConfig(AGENT_DIR, CONFIG_PATH, config);
}
const sessionStartState = buildTelegramSessionStartState(ctx.model);
currentTelegramModel = sessionStartState.currentTelegramModel;
activeTelegramToolExecutions =
sessionStartState.activeTelegramToolExecutions;
pendingTelegramModelSwitch = sessionStartState.pendingTelegramModelSwitch;
nextQueuedTelegramItemOrder =
sessionStartState.nextQueuedTelegramItemOrder;
nextQueuedTelegramControlOrder =
sessionStartState.nextQueuedTelegramControlOrder;
telegramTurnDispatchPending =
sessionStartState.telegramTurnDispatchPending;
compactionInProgress = sessionStartState.compactionInProgress;
clearTelegramAbortRequested();
await mkdir(TEMP_DIR, { recursive: true });
updateStatus(ctx);
},
onSessionShutdown: async (_event, _ctx) => {
const shutdownState =
buildTelegramSessionShutdownState<TelegramQueueItem>();
queuedTelegramItems = shutdownState.queuedTelegramItems;
nextQueuedTelegramItemOrder = shutdownState.nextQueuedTelegramItemOrder;
nextQueuedTelegramControlOrder =
shutdownState.nextQueuedTelegramControlOrder;
nextPriorityReactionOrder = shutdownState.nextPriorityReactionOrder;
currentTelegramModel = shutdownState.currentTelegramModel;
activeTelegramToolExecutions = shutdownState.activeTelegramToolExecutions;
pendingTelegramModelSwitch = shutdownState.pendingTelegramModelSwitch;
telegramTurnDispatchPending = shutdownState.telegramTurnDispatchPending;
compactionInProgress = shutdownState.compactionInProgress;
for (const state of mediaGroups.values()) {
if (state.flushTimer) clearTimeout(state.flushTimer);
}
mediaGroups.clear();
modelMenus.clear();
if (activeTelegramTurn) {
await clearPreview(activeTelegramTurn.chatId);
}
activeTelegramTurn = undefined;
currentAbort = undefined;
clearTelegramAbortRequested();
preserveQueuedTurnsAsHistory = false;
await stopPolling();
},
onBeforeAgentStart: (event) => {
const nextEvent = event as { prompt: string; systemPrompt: string };
const suffix = isTelegramPrompt(nextEvent.prompt)
? `${SYSTEM_PROMPT_SUFFIX}\n- The current user message came from Telegram.`
: SYSTEM_PROMPT_SUFFIX;
return {
systemPrompt: nextEvent.systemPrompt + suffix,
};
},
onModelSelect: (event, ctx) => {
currentTelegramModel = (event as { model: Model<any> }).model;
updateStatus(ctx);
},
onAgentStart: async (_event, ctx) => {
currentAbort = () => ctx.abort();
clearTelegramAbortRequested();
const startPlan = buildTelegramAgentStartPlan({
queuedItems: queuedTelegramItems,
hasPendingDispatch: telegramTurnDispatchPending,
hasActiveTurn: !!activeTelegramTurn,
});
if (startPlan.shouldResetToolExecutions) {
activeTelegramToolExecutions = 0;
}
if (startPlan.shouldResetPendingModelSwitch) {
pendingTelegramModelSwitch = undefined;
}
queuedTelegramItems = startPlan.remainingItems;
if (startPlan.shouldClearDispatchPending) {
telegramTurnDispatchPending = false;
}
if (startPlan.activeTurn) {
activeTelegramTurn = { ...startPlan.activeTurn };
pendingNonTextBlocks = [];
previewState = createPreviewState();
startTypingLoop(ctx);
}
updateStatus(ctx);
},
onToolExecutionStart: () => {
activeTelegramToolExecutions = getNextTelegramToolExecutionCount({
hasActiveTurn: !!activeTelegramTurn,
currentCount: activeTelegramToolExecutions,
event: "start",
});
},
onToolExecutionEnd: (_event, ctx) => {
activeTelegramToolExecutions = getNextTelegramToolExecutionCount({
hasActiveTurn: !!activeTelegramTurn,
currentCount: activeTelegramToolExecutions,
event: "end",
});
if (!activeTelegramTurn) return;
triggerPendingTelegramModelSwitchAbort(ctx);
},
onMessageStart: async (_event, _ctx) => {
if (!activeTelegramTurn) return;
if (previewState && (previewState.pendingText.trim().length > 0 || previewState.lastSentText.trim().length > 0)) {
const previousText = previewState.pendingText.trim();
if (previousText.length > 0) {
await finalizeMarkdownPreview(activeTelegramTurn.chatId, previousText);
} else {
await finalizePreview(activeTelegramTurn.chatId);
}
}
// Flush non-text blocks from the completed previous message now that args are fully populated
for (const block of pendingNonTextBlocks) {
const msg = renderBlockMessage(block, displayMode);
if (msg) void sendMarkdownReply(activeTelegramTurn.chatId, activeTelegramTurn.replyToMessageId, msg);
}
pendingNonTextBlocks = [];
previewState = createPreviewState();
},
onMessageUpdate: async (event, _ctx) => {
const nextEvent = event as { message: AgentMessage };
if (!activeTelegramTurn || !isAssistantMessage(nextEvent.message)) return;
if (!previewState) previewState = createPreviewState();
const allBlocks = getMessageBlocks(nextEvent.message);
// Buffer non-text blocks; emit after the message completes (in onMessageStart / onAgentEnd)
// so tool_call blocks have their args fully populated before being sent
pendingNonTextBlocks = allBlocks.filter((b) => b.type !== "text");
// Stream text content in the preview message
const textContent = allBlocks
.filter((b) => b.type === "text")
.map((b) => (b as { type: "text"; text: string }).text)
.join("")
.trim();
if (textContent) {
previewState.pendingText = textContent;
schedulePreviewFlush(activeTelegramTurn.chatId);
}
},
onAgentEnd: async (event, ctx) => {
const turn = activeTelegramTurn;
currentAbort = undefined;
clearTelegramAbortRequested();
stopTypingLoop();
activeTelegramTurn = undefined;
activeTelegramToolExecutions = 0;
pendingTelegramModelSwitch = undefined;
telegramTurnDispatchPending = false;
updateStatus(ctx);
const assistant = turn
? await extractAssistantSummary((event as { messages: AgentMessage[] }).messages)
: { blocks: [], text: undefined, stopReason: undefined, errorMessage: undefined };
const endPlan = buildTelegramAgentEndPlan({
hasTurn: !!turn,
stopReason: assistant.stopReason,
hasFinalText: !!(assistant.text?.trim()),
hasQueuedAttachments: (turn?.queuedAttachments.length ?? 0) > 0,
preserveQueuedTurnsAsHistory,
});
if (!turn) {
if (endPlan.shouldDispatchNext) dispatchNextQueuedTelegramTurn(ctx);
return;
}
if (endPlan.shouldClearPreview) {
await clearPreview(turn.chatId);
}
if (endPlan.shouldSendErrorMessage) {
const errorText = assistant.errorMessage || "Telegram bridge: pi failed while processing the request.";
await finalizePreview(turn.chatId);
await sendTextReply(turn.chatId, turn.replyToMessageId, `**Error**: ${errorText}`);
if (endPlan.shouldDispatchNext) dispatchNextQueuedTelegramTurn(ctx);
return;
}
// Flush tool results from the completed transcript plus any non-text blocks
// from the final assistant message (single-message turns never trigger onMessageStart).
const finalTraceBlocks = [
...assistant.blocks.filter((block) => block.type === "tool_result"),
...pendingNonTextBlocks,
];
for (const block of finalTraceBlocks) {
const msg = renderBlockMessage(block, displayMode);
if (msg) void sendMarkdownReply(turn.chatId, turn.replyToMessageId, msg);
}
pendingNonTextBlocks = [];
// Finalize the streaming text preview (only for normal completions, not abort/empty)
if (endPlan.kind === "text") {
const finalText = assistant.text?.trim() || previewState?.pendingText.trim();
if (finalText) {
const finalized = await finalizeMarkdownPreview(turn.chatId, finalText);
if (!finalized) {
await clearPreview(turn.chatId);
await sendMarkdownReply(turn.chatId, turn.replyToMessageId, finalText);
}
} else {
await finalizePreview(turn.chatId);
}
// Cost footer
const turnCost = extractTurnCost((event as { messages: AgentMessage[] }).messages as any);
const usage = ctx.getContextUsage();
if (turnCost) {
void sendTextReply(turn.chatId, turn.replyToMessageId, `---\n${formatTurnCostLine(turnCost, usage?.percent ?? null)}`);
}
}
if (endPlan.shouldSendAttachmentNotice) {
await sendTextReply(turn.chatId, turn.replyToMessageId, "Attached requested file(s).");
}
await sendQueuedAttachments(turn);
if (endPlan.shouldDispatchNext) dispatchNextQueuedTelegramTurn(ctx);
},
});
}