From c3e5a4d908a5829615938b0b87e97e17788d1a30 Mon Sep 17 00:00:00 2001 From: pinks Date: Tue, 12 Sep 2023 22:37:41 +0200 Subject: [PATCH] fix: reuse msgs instead of sending and deleting --- bot/img2imgCommand.ts | 3 +- bot/txt2imgCommand.ts | 3 +- db/jobStore.ts | 32 +++++++-- tasks/processJobs.ts | 126 +++++++++++++++++++++++------------ tasks/updateJobStatusMsgs.ts | 6 +- 5 files changed, 116 insertions(+), 54 deletions(-) diff --git a/bot/img2imgCommand.ts b/bot/img2imgCommand.ts index 1b995f3..e73ac1b 100644 --- a/bot/img2imgCommand.ts +++ b/bot/img2imgCommand.ts @@ -115,8 +115,7 @@ async function img2img( from: ctx.message.from, chat: ctx.message.chat, requestMessageId: ctx.message.message_id, - replyMessageId: replyMessage.message_id, - status: { type: "waiting" }, + status: { type: "waiting", message: replyMessage }, }); logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`); diff --git a/bot/txt2imgCommand.ts b/bot/txt2imgCommand.ts index 58470b1..9b4e48b 100644 --- a/bot/txt2imgCommand.ts +++ b/bot/txt2imgCommand.ts @@ -96,8 +96,7 @@ async function txt2img(ctx: Context, match: string, includeRepliedTo: boolean): from: ctx.message.from, chat: ctx.message.chat, requestMessageId: ctx.message.message_id, - replyMessageId: replyMessage.message_id, - status: { type: "waiting" }, + status: { type: "waiting", message: replyMessage }, }); logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`); diff --git a/db/jobStore.ts b/db/jobStore.ts index 45b9f98..b1e71ad 100644 --- a/db/jobStore.ts +++ b/db/jobStore.ts @@ -4,16 +4,36 @@ import { db } from "./db.ts"; export interface JobSchema { task: - | { type: "txt2img"; params: Partial } - | { type: "img2img"; params: Partial; fileId: string }; + | { + type: "txt2img"; + params: Partial; + } + | { + type: "img2img"; + params: Partial; + fileId: string; + }; from: GrammyTypes.User; chat: GrammyTypes.Chat; requestMessageId: number; - replyMessageId?: number; status: - | { type: "waiting" } - | { type: "processing"; progress: number; worker: string; updatedDate: Date } - | { type: "done"; info?: SdTxt2ImgInfo; startDate?: Date; endDate?: Date }; + | { + type: "waiting"; + message?: GrammyTypes.Message.TextMessage; + } + | { + type: "processing"; + progress: number; + worker: string; + updatedDate: Date; + message?: GrammyTypes.Message.TextMessage; + } + | { + type: "done"; + info?: SdTxt2ImgInfo; + startDate?: Date; + endDate?: Date; + }; } export const jobStore = new IKV.Store(db, "job", { diff --git a/tasks/processJobs.ts b/tasks/processJobs.ts index 25ab05a..8aa56be 100644 --- a/tasks/processJobs.ts +++ b/tasks/processJobs.ts @@ -40,10 +40,16 @@ export async function processJobs(): Promise { if (!worker) continue; // process the job - await job.update({ - status: { type: "processing", progress: 0, worker: worker.name, updatedDate: new Date() }, - }); - + await job.update((value) => ({ + ...value, + status: { + type: "processing", + progress: 0, + worker: worker.name, + updatedDate: new Date(), + message: job.value.status.type !== "done" ? job.value.status.message : undefined, + }, + })); busyWorkers.add(worker.name); processJob(job, worker, config) .catch(async (err) => { @@ -89,31 +95,61 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: ); const startDate = new Date(); - // if there is already a status message delete it - if (job.value.replyMessageId) { - await bot.api.deleteMessage(job.value.chat.id, job.value.replyMessageId) - .catch(() => undefined); + // if there is already a status message and its older than 10 seconds + if ( + job.value.status.type === "processing" && job.value.status.message && + (Date.now() - job.value.status.message.date * 1000) > 10 * 1000 + ) { + // delete it + await bot.api.deleteMessage( + job.value.status.message.chat.id, + job.value.status.message.message_id, + ).catch(() => undefined); + await job.update((value) => ({ + ...value, + status: { ...value.status, message: undefined }, + })); } - // send a new status message - const newStatusMessage = await bot.api.sendMessage( - job.value.chat.id, - `Generating your prompt now... 0% using ${worker.name}`, - { reply_to_message_id: job.value.requestMessageId }, - ).catch((err) => { - // don't error if the request message was deleted - if (err instanceof Grammy.GrammyError && err.message.match(/repl(y|ied)/)) return null; - else throw err; - }); - // if the request message was deleted, cancel the job - if (!newStatusMessage) { + // we have to check if job is still processing at every step because TypeScript + if (job.value.status.type === "processing") { + // if now there is no status message + if (!job.value.status.message) { + // send a new status message + const statusMessage = await bot.api.sendMessage( + job.value.chat.id, + `Generating your prompt now... 0% using ${worker.name}`, + { reply_to_message_id: job.value.requestMessageId }, + ).catch((err) => { + // if the request message (the message we are replying to) was deleted + if (err instanceof Grammy.GrammyError && err.message.match(/repl(y|ied)/)) { + // jest set the status message to undefined + return undefined; + } + throw err; + }); + await job.update((value) => ({ + ...value, + status: { ...value.status, message: statusMessage }, + })); + } else { + // edit the existing status message + await bot.api.editMessageText( + job.value.status.message.chat.id, + job.value.status.message.message_id, + `Generating your prompt now... 0% using ${worker.name}`, + { maxAttempts: 1 }, + ).catch(() => undefined); + } + } + + // if we don't have a status message (it failed sending because request was deleted) + if (job.value.status.type === "processing" && !job.value.status.message) { + // cancel the job await job.delete(); - logger().info( - `Job cancelled for ${formatUserChat(job.value)}`, - ); + logger().info(`Job cancelled for ${formatUserChat(job.value)}`); return; } - await job.update({ replyMessageId: newStatusMessage.message_id }); // reduce size if worker can't handle the resolution const size = limitSize( @@ -123,25 +159,26 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: // process the job const handleProgress = async (progress: SdProgressResponse) => { - // important: don't let any errors escape this callback - if (job.value.replyMessageId) { + if (job.value.status.type === "processing" && job.value.status.message) { await bot.api.editMessageText( - job.value.chat.id, - job.value.replyMessageId, + job.value.status.message.chat.id, + job.value.status.message.message_id, `Generating your prompt now... ${ (progress.progress * 100).toFixed(0) }% using ${worker.name}`, { maxAttempts: 1 }, ).catch(() => undefined); } - await job.update({ + await job.update((value) => ({ + ...value, status: { type: "processing", progress: progress.progress, worker: worker.name, updatedDate: new Date(), + message: value.status.type !== "done" ? value.status.message : undefined, }, - }, { maxAttempts: 1 }).catch(() => undefined); + }), { maxAttempts: 1 }).catch(() => undefined); }; let response: SdResponse; const taskType = job.value.task.type; // don't narrow this to never pls typescript @@ -169,11 +206,11 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: throw new Error(`Unknown task type: ${taskType}`); } - // upload the result - if (job.value.replyMessageId) { + // change status message to uploading images + if (job.value.status.type === "processing" && job.value.status.message) { await bot.api.editMessageText( - job.value.chat.id, - job.value.replyMessageId, + job.value.status.message.chat.id, + job.value.status.message.message_id, `Uploading your images...`, ).catch(() => undefined); } @@ -201,6 +238,7 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: : [], ]); + // sending images loop because telegram is unreliable and it would be a shame to lose the images let sendMediaAttempt = 0; let resultMessages: GrammyTypes.Message.MediaMessage[] | undefined; while (true) { @@ -245,17 +283,23 @@ async function processJob(job: IKV.Model, worker: WorkerData, config: } // delete the status message - if (job.value.replyMessageId) { - await bot.api.deleteMessage(job.value.chat.id, job.value.replyMessageId) - .catch(() => undefined) - .then(() => job.update({ replyMessageId: undefined })) - .catch(() => undefined); + if (job.value.status.type === "processing" && job.value.status.message) { + await bot.api.deleteMessage( + job.value.status.message.chat.id, + job.value.status.message.message_id, + ).catch(() => undefined); + await job.update((value) => ({ + ...value, + status: { ...value.status, message: undefined }, + })); } // update job to status done - await job.update({ + await job.update((value) => ({ + ...value, status: { type: "done", info: response.info, startDate, endDate: new Date() }, - }); + })); + logger().debug( `Job finished for ${formatUserChat(job.value)} using ${worker.name}${ sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : "" diff --git a/tasks/updateJobStatusMsgs.ts b/tasks/updateJobStatusMsgs.ts index 17d3f9d..74c8eb6 100644 --- a/tasks/updateJobStatusMsgs.ts +++ b/tasks/updateJobStatusMsgs.ts @@ -14,10 +14,10 @@ export async function updateJobStatusMsgs(): Promise { await new Promise((resolve) => setTimeout(resolve, 5000)); const jobs = await jobStore.getBy("status.type", "waiting"); for (const [index, job] of jobs.entries()) { - if (!job.value.replyMessageId) continue; + if (job.value.status.type !== "waiting" || !job.value.status.message) continue; await bot.api.editMessageText( - job.value.chat.id, - job.value.replyMessageId, + job.value.status.message.chat.id, + job.value.status.message.message_id, `You are ${formatOrdinal(index + 1)} in queue.`, { maxAttempts: 1 }, ).catch(() => undefined);