fix: better handling of timeouts
This commit is contained in:
parent
b818fc9207
commit
0ab1f89579
21
bot/mod.ts
21
bot/mod.ts
|
@ -26,6 +26,27 @@ bot.use(session);
|
|||
|
||||
bot.api.config.use(GrammyFiles.hydrateFiles(bot.token));
|
||||
|
||||
// Automatically cancel requests after 30 seconds
|
||||
bot.api.config.use(async (prev, method, payload, signal) => {
|
||||
const controller = new AbortController();
|
||||
let timedOut = false;
|
||||
const timeout = setTimeout(() => {
|
||||
timedOut = true;
|
||||
controller.abort();
|
||||
}, 30 * 1000);
|
||||
signal?.addEventListener("abort", () => {
|
||||
controller.abort();
|
||||
});
|
||||
try {
|
||||
return await prev(method, payload, controller.signal);
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
if (timedOut) {
|
||||
logger().warning(`${method} timed out`);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Automatically retry bot requests if we get a "too many requests" or telegram internal error
|
||||
bot.api.config.use(async (prev, method, payload, signal) => {
|
||||
const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3;
|
||||
|
|
|
@ -49,7 +49,7 @@ export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
|
|||
|
||||
async function handleFutureUpdates() {
|
||||
for (let idx = 0; idx < 30; idx++) {
|
||||
await ctx.api.sendChatAction(ctx.chat.id, "typing");
|
||||
await ctx.api.sendChatAction(ctx.chat.id, "typing", { maxAttempts: 1 } as never);
|
||||
await new Promise((resolve) => setTimeout(resolve, 4000));
|
||||
const nextFormattedMessage = await getMessageText();
|
||||
if (nextFormattedMessage.text !== formattedMessage.text) {
|
||||
|
|
48
sd.ts
48
sd.ts
|
@ -1,13 +1,17 @@
|
|||
import { Async, AsyncX, PngChunksExtract, PngChunkText } from "./deps.ts";
|
||||
|
||||
const neverSignal = new AbortController().signal;
|
||||
|
||||
export interface SdApi {
|
||||
url: string;
|
||||
auth?: string;
|
||||
}
|
||||
|
||||
async function fetchSdApi<T>(api: SdApi, endpoint: string, body?: unknown): Promise<T> {
|
||||
async function fetchSdApi<T>(
|
||||
api: SdApi,
|
||||
endpoint: string,
|
||||
{ body, timeoutMs }: { body?: unknown; timeoutMs?: number } = {},
|
||||
): Promise<T> {
|
||||
const controller = new AbortController();
|
||||
const timeoutId = timeoutMs ? setTimeout(() => controller.abort(), timeoutMs) : undefined;
|
||||
let options: RequestInit | undefined;
|
||||
if (body != null) {
|
||||
options = {
|
||||
|
@ -17,13 +21,18 @@ async function fetchSdApi<T>(api: SdApi, endpoint: string, body?: unknown): Prom
|
|||
...api.auth ? { Authorization: api.auth } : {},
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
signal: controller.signal,
|
||||
};
|
||||
} else if (api.auth) {
|
||||
options = {
|
||||
headers: { Authorization: api.auth },
|
||||
signal: controller.signal,
|
||||
};
|
||||
}
|
||||
const response = await fetch(new URL(endpoint, api.url), options).catch(() => {
|
||||
if (controller.signal.aborted) {
|
||||
throw new SdApiError(endpoint, options, -1, "Timed out");
|
||||
}
|
||||
throw new SdApiError(endpoint, options, 0, "Network error");
|
||||
});
|
||||
const result = await response.json().catch(() => {
|
||||
|
@ -31,6 +40,7 @@ async function fetchSdApi<T>(api: SdApi, endpoint: string, body?: unknown): Prom
|
|||
detail: "Invalid JSON",
|
||||
});
|
||||
});
|
||||
clearTimeout(timeoutId);
|
||||
if (!response.ok) {
|
||||
throw new SdApiError(endpoint, options, response.status, response.statusText, result);
|
||||
}
|
||||
|
@ -78,9 +88,12 @@ export async function sdTxt2Img(
|
|||
api: SdApi,
|
||||
params: Partial<SdTxt2ImgRequest>,
|
||||
onProgress?: (progress: SdProgressResponse) => void,
|
||||
signal: AbortSignal = neverSignal,
|
||||
): Promise<SdResponse<SdTxt2ImgRequest>> {
|
||||
const request = fetchSdApi<SdResponse<SdTxt2ImgRequest>>(api, "sdapi/v1/txt2img", params)
|
||||
const request = fetchSdApi<SdResponse<SdTxt2ImgRequest>>(
|
||||
api,
|
||||
"sdapi/v1/txt2img",
|
||||
{ body: params },
|
||||
)
|
||||
// JSON field "info" is a JSON-serialized string so we need to parse this part second time
|
||||
.then((data) => ({
|
||||
...data,
|
||||
|
@ -89,13 +102,15 @@ export async function sdTxt2Img(
|
|||
|
||||
try {
|
||||
while (true) {
|
||||
await Async.abortable(Promise.race([request, Async.delay(4000)]), signal);
|
||||
await Promise.race([request, Async.delay(3000)]);
|
||||
if (await AsyncX.promiseState(request) !== "pending") return await request;
|
||||
onProgress?.(await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress"));
|
||||
onProgress?.(
|
||||
await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress", { timeoutMs: 10_000 }),
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
if (await AsyncX.promiseState(request) === "pending") {
|
||||
await fetchSdApi(api, "sdapi/v1/interrupt", {});
|
||||
await fetchSdApi(api, "sdapi/v1/interrupt", { timeoutMs: 10_000 });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -118,9 +133,12 @@ export async function sdImg2Img(
|
|||
api: SdApi,
|
||||
params: Partial<SdImg2ImgRequest>,
|
||||
onProgress?: (progress: SdProgressResponse) => void,
|
||||
signal: AbortSignal = neverSignal,
|
||||
): Promise<SdResponse<SdImg2ImgRequest>> {
|
||||
const request = fetchSdApi<SdResponse<SdImg2ImgRequest>>(api, "sdapi/v1/img2img", params)
|
||||
const request = fetchSdApi<SdResponse<SdImg2ImgRequest>>(
|
||||
api,
|
||||
"sdapi/v1/img2img",
|
||||
{ body: params },
|
||||
)
|
||||
// JSON field "info" is a JSON-serialized string so we need to parse this part second time
|
||||
.then((data) => ({
|
||||
...data,
|
||||
|
@ -129,13 +147,15 @@ export async function sdImg2Img(
|
|||
|
||||
try {
|
||||
while (true) {
|
||||
await Async.abortable(Promise.race([request, Async.delay(4000)]), signal);
|
||||
await Promise.race([request, Async.delay(3000)]);
|
||||
if (await AsyncX.promiseState(request) !== "pending") return await request;
|
||||
onProgress?.(await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress"));
|
||||
onProgress?.(
|
||||
await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress", { timeoutMs: 10_000 }),
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
if (await AsyncX.promiseState(request) === "pending") {
|
||||
await fetchSdApi(api, "sdapi/v1/interrupt", {});
|
||||
await fetchSdApi(api, "sdapi/v1/interrupt", { timeoutMs: 10_000 });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -220,7 +240,7 @@ export interface SdProgressState {
|
|||
}
|
||||
|
||||
export function sdGetConfig(api: SdApi): Promise<SdConfigResponse> {
|
||||
return fetchSdApi(api, "config");
|
||||
return fetchSdApi(api, "config", { timeoutMs: 10_000 });
|
||||
}
|
||||
|
||||
export interface SdConfigResponse {
|
||||
|
|
|
@ -161,7 +161,9 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
|||
|
||||
// process the job
|
||||
const handleProgress = async (progress: SdProgressResponse) => {
|
||||
// Important: don't let any errors escape this function
|
||||
if (job.value.status.type === "processing" && job.value.status.message) {
|
||||
if (job.value.status.progress === progress.progress) return;
|
||||
await Promise.all([
|
||||
bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 }),
|
||||
bot.api.editMessageText(
|
||||
|
@ -182,7 +184,11 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
|||
message: value.status.type !== "done" ? value.status.message : undefined,
|
||||
},
|
||||
}), { maxAttempts: 1 }),
|
||||
]).catch(() => undefined);
|
||||
]).catch((err) =>
|
||||
logger().warning(
|
||||
`Updating job status for ${formatUserChat(job.value)} using ${worker.id} failed: ${err}`,
|
||||
)
|
||||
);
|
||||
}
|
||||
};
|
||||
let response: SdResponse<unknown>;
|
||||
|
@ -217,6 +223,7 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
|||
job.value.status.message.chat.id,
|
||||
job.value.status.message.message_id,
|
||||
`Uploading your images...`,
|
||||
{ maxAttempts: 1 },
|
||||
).catch(() => undefined);
|
||||
}
|
||||
|
||||
|
@ -275,7 +282,11 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
|||
});
|
||||
break;
|
||||
} catch (err) {
|
||||
logger().warning(`Sending images (attempt ${sendMediaAttempt}) failed: ${err}`);
|
||||
logger().warning(
|
||||
`Sending images (attempt ${sendMediaAttempt}) for ${
|
||||
formatUserChat(job.value)
|
||||
} using ${worker.id} failed: ${err}`,
|
||||
);
|
||||
if (sendMediaAttempt >= 6) throw err;
|
||||
// wait 2 * 5 seconds before retrying
|
||||
for (let i = 0; i < 2; i++) {
|
||||
|
|
Loading…
Reference in New Issue