From b8668071123adbe66948025dc892c5418f33091a Mon Sep 17 00:00:00 2001 From: wassname <1103714+wassname@users.noreply.github.com> Date: Wed, 29 Apr 2026 22:34:59 +0800 Subject: [PATCH] Survive transient Telegram fetch failures Wrap fetch calls in callTelegram, callTelegramMultipart, and downloadTelegramFile with bounded retry (max 3 attempts, 500ms/2000ms backoff with jitter) on transient network codes and a per-attempt 15s AbortController timeout so a stuck connection can't wedge the bridge indefinitely. HTTP 4xx/5xx still surface as before. Add a process-level unhandledRejection handler so a stray rejection from a fire-and-forget void f() (e.g. timer-driven preview flush) can't crash the host under Node 22's default unhandledRejection=throw - which is how a remote session got dropped. Also tighten the two timer-callback voids that can fire after their turn's try/catch is gone. --- index.ts | 26 ++++++++++++++-- lib/api.ts | 79 ++++++++++++++++++++++++++++++++++++++++++++--- tests/api.test.ts | 18 +++++++++++ 3 files changed, 116 insertions(+), 7 deletions(-) diff --git a/index.ts b/index.ts index f725164..8193247 100644 --- a/index.ts +++ b/index.ts @@ -407,7 +407,22 @@ export const __telegramTestUtils = { ), }; +// Install once: a stray Telegram fetch rejection that escapes through a fire-and-forget +// `void f()` would otherwise crash the host process under Node 22 (default +// unhandledRejection = throw). Dropping the message is fine; killing the session is not. +// Scoped to unhandledRejection only - we deliberately do NOT swallow uncaughtException, +// since that could mask real bugs unrelated to this bridge. +let unhandledRejectionHandlerInstalled = false; +function installUnhandledRejectionHandler(): void { + if (unhandledRejectionHandlerInstalled) return; + unhandledRejectionHandlerInstalled = true; + process.on("unhandledRejection", (reason) => { + console.error("[pi-telegram] unhandledRejection (suppressed):", reason); + }); +} + export default function (pi: ExtensionAPI) { + installUnhandledRejectionHandler(); let config: TelegramConfig = {}; let pollingController: AbortController | undefined; let pollingPromise: Promise | undefined; @@ -969,7 +984,9 @@ export default function (pi: ExtensionAPI) { function schedulePreviewFlush(chatId: number): void { if (!previewState || previewState.flushTimer) return; previewState.flushTimer = setTimeout(() => { - void flushPreview(chatId); + flushPreview(chatId).catch((err) => + console.warn("[pi-telegram] flushPreview failed:", err), + ); }, PREVIEW_THROTTLE_MS); } @@ -1947,7 +1964,12 @@ export default function (pi: ExtensionAPI) { const state = mediaGroups.get(key); mediaGroups.delete(key); if (!state) return; - void dispatchAuthorizedTelegramMessages(state.messages, ctx); + dispatchAuthorizedTelegramMessages(state.messages, ctx).catch((err) => + console.warn( + "[pi-telegram] dispatchAuthorizedTelegramMessages failed:", + err, + ), + ); }, TELEGRAM_MEDIA_GROUP_DEBOUNCE_MS); mediaGroups.set(key, existing); return; diff --git a/lib/api.ts b/lib/api.ts index 59c7d42..ea8dcd8 100644 --- a/lib/api.ts +++ b/lib/api.ts @@ -54,6 +54,74 @@ function sanitizeFileName(name: string): string { return name.replace(/[^a-zA-Z0-9._-]+/g, "_"); } +// Network-layer codes that warrant a retry. HTTP 4xx/5xx are NOT retried here - +// those go through `data.ok` / `response.ok` and are surfaced to callers so +// rate-limits and logic errors stay loud. +const TRANSIENT_FETCH_CODES = new Set([ + "UND_ERR_CONNECT_TIMEOUT", + "UND_ERR_SOCKET", + "UND_ERR_HEADERS_TIMEOUT", + "UND_ERR_BODY_TIMEOUT", + "ECONNRESET", + "ETIMEDOUT", + "ENOTFOUND", + "EAI_AGAIN", + "ABORT_ERR", // our own per-attempt timeout, see below +]); + +const FETCH_ATTEMPT_TIMEOUT_MS = 15_000; +const FETCH_RETRY_DELAYS_MS = [500, 2000]; + +function transientCode(err: unknown): string | undefined { + const e = err as { code?: string; cause?: { code?: string }; name?: string }; + const code = e?.code ?? e?.cause?.code; + if (code && TRANSIENT_FETCH_CODES.has(code)) return code; + // AbortError from our timeout has name="AbortError", no code + if (e?.name === "AbortError") return "ABORT_ERR"; + return undefined; +} + +/** + * fetch with bounded in-memory retry on transient network errors and a per-attempt + * AbortController timeout so a stuck connection cannot wedge the bridge forever. + * + * The caller's own AbortSignal (if any) is honored - if it aborts, we re-throw + * immediately and do not retry. + */ +async function fetchWithRetry( + url: string, + init: RequestInit, + callerSignal?: AbortSignal, +): Promise { + for (let attempt = 0; attempt <= FETCH_RETRY_DELAYS_MS.length; attempt++) { + const timeoutCtl = new AbortController(); + const timer = setTimeout( + () => timeoutCtl.abort(), + FETCH_ATTEMPT_TIMEOUT_MS, + ); + const onCallerAbort = () => timeoutCtl.abort(); + callerSignal?.addEventListener("abort", onCallerAbort, { once: true }); + try { + return await fetch(url, { ...init, signal: timeoutCtl.signal }); + } catch (err) { + if (callerSignal?.aborted) throw err; + const code = transientCode(err); + const isLast = attempt === FETCH_RETRY_DELAYS_MS.length; + if (!code || isLast) throw err; + const base = FETCH_RETRY_DELAYS_MS[attempt]; + const jitter = Math.round(base * (0.8 + Math.random() * 0.4)); + console.warn( + `[pi-telegram] transient fetch error ${code} on ${url.replace(/bot[^/]+/, "bot***")}, retry ${attempt + 1} in ${jitter}ms`, + ); + await new Promise((r) => setTimeout(r, jitter)); + } finally { + clearTimeout(timer); + callerSignal?.removeEventListener("abort", onCallerAbort); + } + } + throw new Error("fetchWithRetry: unreachable"); +} + export async function readTelegramConfig( configPath: string, ): Promise { @@ -87,14 +155,14 @@ export async function callTelegram( if (!botToken) { throw new Error("Telegram bot token is not configured"); } - const response = await fetch( + const response = await fetchWithRetry( `https://api.telegram.org/bot${botToken}/${method}`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify(body), - signal: options?.signal, }, + options?.signal, ); const data = (await response.json()) as TelegramApiResponse; if (!data.ok || data.result === undefined) { @@ -121,13 +189,13 @@ export async function callTelegramMultipart( } const buffer = await readFile(filePath); form.set(fileField, new Blob([buffer]), fileName); - const response = await fetch( + const response = await fetchWithRetry( `https://api.telegram.org/bot${botToken}/${method}`, { method: "POST", body: form, - signal: options?.signal, }, + options?.signal, ); const data = (await response.json()) as TelegramApiResponse; if (!data.ok || data.result === undefined) { @@ -153,8 +221,9 @@ export async function downloadTelegramFile( tempDir, `${Date.now()}-${sanitizeFileName(suggestedName)}`, ); - const response = await fetch( + const response = await fetchWithRetry( `https://api.telegram.org/file/bot${botToken}/${file.file_path}`, + { method: "GET" }, ); if (!response.ok) { throw new Error(`Failed to download Telegram file: ${response.status}`); diff --git a/tests/api.test.ts b/tests/api.test.ts index bceb776..a66149c 100644 --- a/tests/api.test.ts +++ b/tests/api.test.ts @@ -65,6 +65,24 @@ test("answerTelegramCallbackQuery ignores Telegram API failures", async () => { } }); +test("callTelegram does not retry on non-transient errors", async () => { + const originalFetch = globalThis.fetch; + let calls = 0; + globalThis.fetch = (async () => { + calls++; + throw new Error("logic error, not a network blip"); + }) as typeof fetch; + try { + await assert.rejects( + () => callTelegram("123:abc", "getMe", {}), + /logic error/, + ); + assert.equal(calls, 1); + } finally { + globalThis.fetch = originalFetch; + } +}); + test("Telegram API client resolves bot tokens lazily for wrapped calls", async () => { const originalFetch = globalThis.fetch; const calls: string[] = [];