From 9155d513b58ad55594f8f12dc9a71779afffab87 Mon Sep 17 00:00:00 2001 From: pinks Date: Thu, 14 Sep 2023 03:23:55 +0200 Subject: [PATCH] wait before retrying failed job --- db/jobStore.ts | 1 + tasks/processJobs.ts | 20 +++++++++++++++----- tasks/returnHangedJobs.ts | 8 +++++++- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/db/jobStore.ts b/db/jobStore.ts index 3f76a9a..25c70c2 100644 --- a/db/jobStore.ts +++ b/db/jobStore.ts @@ -21,6 +21,7 @@ export interface JobSchema { | { type: "waiting"; message?: GrammyTypes.Message.TextMessage; + lastErrorDate?: Date; } | { type: "processing"; diff --git a/tasks/processJobs.ts b/tasks/processJobs.ts index b8a70fc..e158a18 100644 --- a/tasks/processJobs.ts +++ b/tasks/processJobs.ts @@ -33,8 +33,12 @@ export async function processJobs(): Promise { await new Promise((resolve) => setTimeout(resolve, 1000)); try { - // get first waiting job - const job = await jobStore.getBy("status.type", "waiting").then((jobs) => jobs[0]); + const jobs = await jobStore.getBy("status.type", "waiting"); + // get first waiting job which hasn't errored in last minute + const job = jobs.find((job) => + job.value.status.type === "waiting" && + (job.value.status.lastErrorDate?.getTime() ?? 0) < Date.now() - 60_000 + ); if (!job) continue; // find a worker to handle the job @@ -62,13 +66,20 @@ export async function processJobs(): Promise { logger().error( `Job failed for ${formatUserChat(job.value)} via ${worker.id}: ${err}`, ); + if (job.value.status.type === "processing" && job.value.status.message) { + await bot.api.deleteMessage( + job.value.status.message.chat.id, + job.value.status.message.message_id, + ).catch(() => undefined); + } if (err instanceof Grammy.GrammyError || err instanceof SdApiError) { await bot.api.sendMessage( job.value.chat.id, `Failed to generate your prompt using ${worker.name}: ${err.message}`, { reply_to_message_id: job.value.requestMessageId }, ).catch(() => undefined); - await job.update({ status: { type: "waiting" } }).catch(() => undefined); + await job.update({ status: { type: "waiting", lastErrorDate: new Date() } }) + .catch(() => undefined); } if ( err instanceof SdApiError && @@ -169,10 +180,9 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: const handleProgress = async (progress: SdProgressResponse) => { // Important: don't let any errors escape this function if (job.value.status.type === "processing" && job.value.status.message) { - if (job.value.status.progress === progress.progress) return; await Promise.all([ bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 }), - bot.api.editMessageText( + progress.progress > job.value.status.progress && bot.api.editMessageText( job.value.status.message.chat.id, job.value.status.message.message_id, `Generating your prompt now... ${ diff --git a/tasks/returnHangedJobs.ts b/tasks/returnHangedJobs.ts index 8b8a868..673f60b 100644 --- a/tasks/returnHangedJobs.ts +++ b/tasks/returnHangedJobs.ts @@ -17,7 +17,13 @@ export async function returnHangedJobs(): Promise { // if job wasn't updated for 2 minutes, return it to the queue const timeSinceLastUpdateMs = Date.now() - job.value.status.updatedDate.getTime(); if (timeSinceLastUpdateMs > 2 * 60 * 1000) { - await job.update({ status: { type: "waiting" } }); + await job.update((value) => ({ + ...value, + status: { + type: "waiting", + message: value.status.type !== "done" ? value.status.message : undefined, + }, + })); logger().warning( `Job for ${formatUserChat(job.value)} was returned to the queue because it hanged for ${ FmtDuration.format(Math.trunc(timeSinceLastUpdateMs / 1000) * 1000, {