From 0ab1f89579726726bc67b2034cb64e049b81f453 Mon Sep 17 00:00:00 2001 From: pinks Date: Wed, 13 Sep 2023 02:38:09 +0200 Subject: [PATCH] fix: better handling of timeouts --- bot/mod.ts | 21 +++++++++++++++++++ bot/queueCommand.ts | 2 +- sd.ts | 48 +++++++++++++++++++++++++++++++------------- tasks/processJobs.ts | 15 ++++++++++++-- 4 files changed, 69 insertions(+), 17 deletions(-) diff --git a/bot/mod.ts b/bot/mod.ts index 012bab2..e95545e 100644 --- a/bot/mod.ts +++ b/bot/mod.ts @@ -26,6 +26,27 @@ bot.use(session); bot.api.config.use(GrammyFiles.hydrateFiles(bot.token)); +// Automatically cancel requests after 30 seconds +bot.api.config.use(async (prev, method, payload, signal) => { + const controller = new AbortController(); + let timedOut = false; + const timeout = setTimeout(() => { + timedOut = true; + controller.abort(); + }, 30 * 1000); + signal?.addEventListener("abort", () => { + controller.abort(); + }); + try { + return await prev(method, payload, controller.signal); + } finally { + clearTimeout(timeout); + if (timedOut) { + logger().warning(`${method} timed out`); + } + } +}); + // Automatically retry bot requests if we get a "too many requests" or telegram internal error bot.api.config.use(async (prev, method, payload, signal) => { const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3; diff --git a/bot/queueCommand.ts b/bot/queueCommand.ts index de6c446..000b22a 100644 --- a/bot/queueCommand.ts +++ b/bot/queueCommand.ts @@ -49,7 +49,7 @@ export async function queueCommand(ctx: Grammy.CommandContext) { async function handleFutureUpdates() { for (let idx = 0; idx < 30; idx++) { - await ctx.api.sendChatAction(ctx.chat.id, "typing"); + await ctx.api.sendChatAction(ctx.chat.id, "typing", { maxAttempts: 1 } as never); await new Promise((resolve) => setTimeout(resolve, 4000)); const nextFormattedMessage = await getMessageText(); if (nextFormattedMessage.text !== formattedMessage.text) { diff --git a/sd.ts b/sd.ts index 7ab79dc..8f00bef 100644 --- a/sd.ts +++ b/sd.ts @@ -1,13 +1,17 @@ import { Async, AsyncX, PngChunksExtract, PngChunkText } from "./deps.ts"; -const neverSignal = new AbortController().signal; - export interface SdApi { url: string; auth?: string; } -async function fetchSdApi(api: SdApi, endpoint: string, body?: unknown): Promise { +async function fetchSdApi( + api: SdApi, + endpoint: string, + { body, timeoutMs }: { body?: unknown; timeoutMs?: number } = {}, +): Promise { + const controller = new AbortController(); + const timeoutId = timeoutMs ? setTimeout(() => controller.abort(), timeoutMs) : undefined; let options: RequestInit | undefined; if (body != null) { options = { @@ -17,13 +21,18 @@ async function fetchSdApi(api: SdApi, endpoint: string, body?: unknown): Prom ...api.auth ? { Authorization: api.auth } : {}, }, body: JSON.stringify(body), + signal: controller.signal, }; } else if (api.auth) { options = { headers: { Authorization: api.auth }, + signal: controller.signal, }; } const response = await fetch(new URL(endpoint, api.url), options).catch(() => { + if (controller.signal.aborted) { + throw new SdApiError(endpoint, options, -1, "Timed out"); + } throw new SdApiError(endpoint, options, 0, "Network error"); }); const result = await response.json().catch(() => { @@ -31,6 +40,7 @@ async function fetchSdApi(api: SdApi, endpoint: string, body?: unknown): Prom detail: "Invalid JSON", }); }); + clearTimeout(timeoutId); if (!response.ok) { throw new SdApiError(endpoint, options, response.status, response.statusText, result); } @@ -78,9 +88,12 @@ export async function sdTxt2Img( api: SdApi, params: Partial, onProgress?: (progress: SdProgressResponse) => void, - signal: AbortSignal = neverSignal, ): Promise> { - const request = fetchSdApi>(api, "sdapi/v1/txt2img", params) + const request = fetchSdApi>( + api, + "sdapi/v1/txt2img", + { body: params }, + ) // JSON field "info" is a JSON-serialized string so we need to parse this part second time .then((data) => ({ ...data, @@ -89,13 +102,15 @@ export async function sdTxt2Img( try { while (true) { - await Async.abortable(Promise.race([request, Async.delay(4000)]), signal); + await Promise.race([request, Async.delay(3000)]); if (await AsyncX.promiseState(request) !== "pending") return await request; - onProgress?.(await fetchSdApi(api, "sdapi/v1/progress")); + onProgress?.( + await fetchSdApi(api, "sdapi/v1/progress", { timeoutMs: 10_000 }), + ); } } finally { if (await AsyncX.promiseState(request) === "pending") { - await fetchSdApi(api, "sdapi/v1/interrupt", {}); + await fetchSdApi(api, "sdapi/v1/interrupt", { timeoutMs: 10_000 }); } } } @@ -118,9 +133,12 @@ export async function sdImg2Img( api: SdApi, params: Partial, onProgress?: (progress: SdProgressResponse) => void, - signal: AbortSignal = neverSignal, ): Promise> { - const request = fetchSdApi>(api, "sdapi/v1/img2img", params) + const request = fetchSdApi>( + api, + "sdapi/v1/img2img", + { body: params }, + ) // JSON field "info" is a JSON-serialized string so we need to parse this part second time .then((data) => ({ ...data, @@ -129,13 +147,15 @@ export async function sdImg2Img( try { while (true) { - await Async.abortable(Promise.race([request, Async.delay(4000)]), signal); + await Promise.race([request, Async.delay(3000)]); if (await AsyncX.promiseState(request) !== "pending") return await request; - onProgress?.(await fetchSdApi(api, "sdapi/v1/progress")); + onProgress?.( + await fetchSdApi(api, "sdapi/v1/progress", { timeoutMs: 10_000 }), + ); } } finally { if (await AsyncX.promiseState(request) === "pending") { - await fetchSdApi(api, "sdapi/v1/interrupt", {}); + await fetchSdApi(api, "sdapi/v1/interrupt", { timeoutMs: 10_000 }); } } } @@ -220,7 +240,7 @@ export interface SdProgressState { } export function sdGetConfig(api: SdApi): Promise { - return fetchSdApi(api, "config"); + return fetchSdApi(api, "config", { timeoutMs: 10_000 }); } export interface SdConfigResponse { diff --git a/tasks/processJobs.ts b/tasks/processJobs.ts index 939cbc4..43f8b05 100644 --- a/tasks/processJobs.ts +++ b/tasks/processJobs.ts @@ -161,7 +161,9 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: // process the job const handleProgress = async (progress: SdProgressResponse) => { + // Important: don't let any errors escape this function if (job.value.status.type === "processing" && job.value.status.message) { + if (job.value.status.progress === progress.progress) return; await Promise.all([ bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 }), bot.api.editMessageText( @@ -182,7 +184,11 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: message: value.status.type !== "done" ? value.status.message : undefined, }, }), { maxAttempts: 1 }), - ]).catch(() => undefined); + ]).catch((err) => + logger().warning( + `Updating job status for ${formatUserChat(job.value)} using ${worker.id} failed: ${err}`, + ) + ); } }; let response: SdResponse; @@ -217,6 +223,7 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: job.value.status.message.chat.id, job.value.status.message.message_id, `Uploading your images...`, + { maxAttempts: 1 }, ).catch(() => undefined); } @@ -275,7 +282,11 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: }); break; } catch (err) { - logger().warning(`Sending images (attempt ${sendMediaAttempt}) failed: ${err}`); + logger().warning( + `Sending images (attempt ${sendMediaAttempt}) for ${ + formatUserChat(job.value) + } using ${worker.id} failed: ${err}`, + ); if (sendMediaAttempt >= 6) throw err; // wait 2 * 5 seconds before retrying for (let i = 0; i < 2; i++) {