mirror of
https://github.com/wassname/pi-telegram.git
synced 2026-06-27 17:01:39 +08:00
Survive transient Telegram fetch failures
Wrap fetch calls in callTelegram, callTelegramMultipart, and downloadTelegramFile with bounded retry (max 3 attempts, 500ms/2000ms backoff with jitter) on transient network codes and a per-attempt 15s AbortController timeout so a stuck connection can't wedge the bridge indefinitely. HTTP 4xx/5xx still surface as before. Add a process-level unhandledRejection handler so a stray rejection from a fire-and-forget void f() (e.g. timer-driven preview flush) can't crash the host under Node 22's default unhandledRejection=throw - which is how a remote session got dropped. Also tighten the two timer-callback voids that can fire after their turn's try/catch is gone.
This commit is contained in:
@@ -407,7 +407,22 @@ export const __telegramTestUtils = {
|
|||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Install once: a stray Telegram fetch rejection that escapes through a fire-and-forget
|
||||||
|
// `void f()` would otherwise crash the host process under Node 22 (default
|
||||||
|
// unhandledRejection = throw). Dropping the message is fine; killing the session is not.
|
||||||
|
// Scoped to unhandledRejection only - we deliberately do NOT swallow uncaughtException,
|
||||||
|
// since that could mask real bugs unrelated to this bridge.
|
||||||
|
let unhandledRejectionHandlerInstalled = false;
|
||||||
|
function installUnhandledRejectionHandler(): void {
|
||||||
|
if (unhandledRejectionHandlerInstalled) return;
|
||||||
|
unhandledRejectionHandlerInstalled = true;
|
||||||
|
process.on("unhandledRejection", (reason) => {
|
||||||
|
console.error("[pi-telegram] unhandledRejection (suppressed):", reason);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
export default function (pi: ExtensionAPI) {
|
export default function (pi: ExtensionAPI) {
|
||||||
|
installUnhandledRejectionHandler();
|
||||||
let config: TelegramConfig = {};
|
let config: TelegramConfig = {};
|
||||||
let pollingController: AbortController | undefined;
|
let pollingController: AbortController | undefined;
|
||||||
let pollingPromise: Promise<void> | undefined;
|
let pollingPromise: Promise<void> | undefined;
|
||||||
@@ -969,7 +984,9 @@ export default function (pi: ExtensionAPI) {
|
|||||||
function schedulePreviewFlush(chatId: number): void {
|
function schedulePreviewFlush(chatId: number): void {
|
||||||
if (!previewState || previewState.flushTimer) return;
|
if (!previewState || previewState.flushTimer) return;
|
||||||
previewState.flushTimer = setTimeout(() => {
|
previewState.flushTimer = setTimeout(() => {
|
||||||
void flushPreview(chatId);
|
flushPreview(chatId).catch((err) =>
|
||||||
|
console.warn("[pi-telegram] flushPreview failed:", err),
|
||||||
|
);
|
||||||
}, PREVIEW_THROTTLE_MS);
|
}, PREVIEW_THROTTLE_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1947,7 +1964,12 @@ export default function (pi: ExtensionAPI) {
|
|||||||
const state = mediaGroups.get(key);
|
const state = mediaGroups.get(key);
|
||||||
mediaGroups.delete(key);
|
mediaGroups.delete(key);
|
||||||
if (!state) return;
|
if (!state) return;
|
||||||
void dispatchAuthorizedTelegramMessages(state.messages, ctx);
|
dispatchAuthorizedTelegramMessages(state.messages, ctx).catch((err) =>
|
||||||
|
console.warn(
|
||||||
|
"[pi-telegram] dispatchAuthorizedTelegramMessages failed:",
|
||||||
|
err,
|
||||||
|
),
|
||||||
|
);
|
||||||
}, TELEGRAM_MEDIA_GROUP_DEBOUNCE_MS);
|
}, TELEGRAM_MEDIA_GROUP_DEBOUNCE_MS);
|
||||||
mediaGroups.set(key, existing);
|
mediaGroups.set(key, existing);
|
||||||
return;
|
return;
|
||||||
|
|||||||
+74
-5
@@ -54,6 +54,74 @@ function sanitizeFileName(name: string): string {
|
|||||||
return name.replace(/[^a-zA-Z0-9._-]+/g, "_");
|
return name.replace(/[^a-zA-Z0-9._-]+/g, "_");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Network-layer codes that warrant a retry. HTTP 4xx/5xx are NOT retried here -
|
||||||
|
// those go through `data.ok` / `response.ok` and are surfaced to callers so
|
||||||
|
// rate-limits and logic errors stay loud.
|
||||||
|
const TRANSIENT_FETCH_CODES = new Set([
|
||||||
|
"UND_ERR_CONNECT_TIMEOUT",
|
||||||
|
"UND_ERR_SOCKET",
|
||||||
|
"UND_ERR_HEADERS_TIMEOUT",
|
||||||
|
"UND_ERR_BODY_TIMEOUT",
|
||||||
|
"ECONNRESET",
|
||||||
|
"ETIMEDOUT",
|
||||||
|
"ENOTFOUND",
|
||||||
|
"EAI_AGAIN",
|
||||||
|
"ABORT_ERR", // our own per-attempt timeout, see below
|
||||||
|
]);
|
||||||
|
|
||||||
|
const FETCH_ATTEMPT_TIMEOUT_MS = 15_000;
|
||||||
|
const FETCH_RETRY_DELAYS_MS = [500, 2000];
|
||||||
|
|
||||||
|
function transientCode(err: unknown): string | undefined {
|
||||||
|
const e = err as { code?: string; cause?: { code?: string }; name?: string };
|
||||||
|
const code = e?.code ?? e?.cause?.code;
|
||||||
|
if (code && TRANSIENT_FETCH_CODES.has(code)) return code;
|
||||||
|
// AbortError from our timeout has name="AbortError", no code
|
||||||
|
if (e?.name === "AbortError") return "ABORT_ERR";
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* fetch with bounded in-memory retry on transient network errors and a per-attempt
|
||||||
|
* AbortController timeout so a stuck connection cannot wedge the bridge forever.
|
||||||
|
*
|
||||||
|
* The caller's own AbortSignal (if any) is honored - if it aborts, we re-throw
|
||||||
|
* immediately and do not retry.
|
||||||
|
*/
|
||||||
|
async function fetchWithRetry(
|
||||||
|
url: string,
|
||||||
|
init: RequestInit,
|
||||||
|
callerSignal?: AbortSignal,
|
||||||
|
): Promise<Response> {
|
||||||
|
for (let attempt = 0; attempt <= FETCH_RETRY_DELAYS_MS.length; attempt++) {
|
||||||
|
const timeoutCtl = new AbortController();
|
||||||
|
const timer = setTimeout(
|
||||||
|
() => timeoutCtl.abort(),
|
||||||
|
FETCH_ATTEMPT_TIMEOUT_MS,
|
||||||
|
);
|
||||||
|
const onCallerAbort = () => timeoutCtl.abort();
|
||||||
|
callerSignal?.addEventListener("abort", onCallerAbort, { once: true });
|
||||||
|
try {
|
||||||
|
return await fetch(url, { ...init, signal: timeoutCtl.signal });
|
||||||
|
} catch (err) {
|
||||||
|
if (callerSignal?.aborted) throw err;
|
||||||
|
const code = transientCode(err);
|
||||||
|
const isLast = attempt === FETCH_RETRY_DELAYS_MS.length;
|
||||||
|
if (!code || isLast) throw err;
|
||||||
|
const base = FETCH_RETRY_DELAYS_MS[attempt];
|
||||||
|
const jitter = Math.round(base * (0.8 + Math.random() * 0.4));
|
||||||
|
console.warn(
|
||||||
|
`[pi-telegram] transient fetch error ${code} on ${url.replace(/bot[^/]+/, "bot***")}, retry ${attempt + 1} in ${jitter}ms`,
|
||||||
|
);
|
||||||
|
await new Promise((r) => setTimeout(r, jitter));
|
||||||
|
} finally {
|
||||||
|
clearTimeout(timer);
|
||||||
|
callerSignal?.removeEventListener("abort", onCallerAbort);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new Error("fetchWithRetry: unreachable");
|
||||||
|
}
|
||||||
|
|
||||||
export async function readTelegramConfig(
|
export async function readTelegramConfig(
|
||||||
configPath: string,
|
configPath: string,
|
||||||
): Promise<TelegramConfig> {
|
): Promise<TelegramConfig> {
|
||||||
@@ -87,14 +155,14 @@ export async function callTelegram<TResponse>(
|
|||||||
if (!botToken) {
|
if (!botToken) {
|
||||||
throw new Error("Telegram bot token is not configured");
|
throw new Error("Telegram bot token is not configured");
|
||||||
}
|
}
|
||||||
const response = await fetch(
|
const response = await fetchWithRetry(
|
||||||
`https://api.telegram.org/bot${botToken}/${method}`,
|
`https://api.telegram.org/bot${botToken}/${method}`,
|
||||||
{
|
{
|
||||||
method: "POST",
|
method: "POST",
|
||||||
headers: { "content-type": "application/json" },
|
headers: { "content-type": "application/json" },
|
||||||
body: JSON.stringify(body),
|
body: JSON.stringify(body),
|
||||||
signal: options?.signal,
|
|
||||||
},
|
},
|
||||||
|
options?.signal,
|
||||||
);
|
);
|
||||||
const data = (await response.json()) as TelegramApiResponse<TResponse>;
|
const data = (await response.json()) as TelegramApiResponse<TResponse>;
|
||||||
if (!data.ok || data.result === undefined) {
|
if (!data.ok || data.result === undefined) {
|
||||||
@@ -121,13 +189,13 @@ export async function callTelegramMultipart<TResponse>(
|
|||||||
}
|
}
|
||||||
const buffer = await readFile(filePath);
|
const buffer = await readFile(filePath);
|
||||||
form.set(fileField, new Blob([buffer]), fileName);
|
form.set(fileField, new Blob([buffer]), fileName);
|
||||||
const response = await fetch(
|
const response = await fetchWithRetry(
|
||||||
`https://api.telegram.org/bot${botToken}/${method}`,
|
`https://api.telegram.org/bot${botToken}/${method}`,
|
||||||
{
|
{
|
||||||
method: "POST",
|
method: "POST",
|
||||||
body: form,
|
body: form,
|
||||||
signal: options?.signal,
|
|
||||||
},
|
},
|
||||||
|
options?.signal,
|
||||||
);
|
);
|
||||||
const data = (await response.json()) as TelegramApiResponse<TResponse>;
|
const data = (await response.json()) as TelegramApiResponse<TResponse>;
|
||||||
if (!data.ok || data.result === undefined) {
|
if (!data.ok || data.result === undefined) {
|
||||||
@@ -153,8 +221,9 @@ export async function downloadTelegramFile(
|
|||||||
tempDir,
|
tempDir,
|
||||||
`${Date.now()}-${sanitizeFileName(suggestedName)}`,
|
`${Date.now()}-${sanitizeFileName(suggestedName)}`,
|
||||||
);
|
);
|
||||||
const response = await fetch(
|
const response = await fetchWithRetry(
|
||||||
`https://api.telegram.org/file/bot${botToken}/${file.file_path}`,
|
`https://api.telegram.org/file/bot${botToken}/${file.file_path}`,
|
||||||
|
{ method: "GET" },
|
||||||
);
|
);
|
||||||
if (!response.ok) {
|
if (!response.ok) {
|
||||||
throw new Error(`Failed to download Telegram file: ${response.status}`);
|
throw new Error(`Failed to download Telegram file: ${response.status}`);
|
||||||
|
|||||||
@@ -65,6 +65,24 @@ test("answerTelegramCallbackQuery ignores Telegram API failures", async () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("callTelegram does not retry on non-transient errors", async () => {
|
||||||
|
const originalFetch = globalThis.fetch;
|
||||||
|
let calls = 0;
|
||||||
|
globalThis.fetch = (async () => {
|
||||||
|
calls++;
|
||||||
|
throw new Error("logic error, not a network blip");
|
||||||
|
}) as typeof fetch;
|
||||||
|
try {
|
||||||
|
await assert.rejects(
|
||||||
|
() => callTelegram("123:abc", "getMe", {}),
|
||||||
|
/logic error/,
|
||||||
|
);
|
||||||
|
assert.equal(calls, 1);
|
||||||
|
} finally {
|
||||||
|
globalThis.fetch = originalFetch;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
test("Telegram API client resolves bot tokens lazily for wrapped calls", async () => {
|
test("Telegram API client resolves bot tokens lazily for wrapped calls", async () => {
|
||||||
const originalFetch = globalThis.fetch;
|
const originalFetch = globalThis.fetch;
|
||||||
const calls: string[] = [];
|
const calls: string[] = [];
|
||||||
|
|||||||
Reference in New Issue
Block a user