From 03b0c2bc899a6c088267315feb8421c8db67eb3d Mon Sep 17 00:00:00 2001 From: lisq Date: Sat, 2 Mar 2024 17:42:36 +0100 Subject: [PATCH] perf: faster queue status updates --- app/generationQueue.ts | 32 +++++++------------------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/app/generationQueue.ts b/app/generationQueue.ts index d882ad7..c96d063 100644 --- a/app/generationQueue.ts +++ b/app/generationQueue.ts @@ -4,7 +4,7 @@ import { JobData, Queue, Worker } from "kvmq"; import createOpenApiClient from "openapi_fetch"; import { delay } from "std/async/delay.ts"; import { decode, encode } from "std/encoding/base64.ts"; -import { debug, error, info, warning } from "std/log/mod.ts"; +import { debug, error, info } from "std/log/mod.ts"; import { ulid } from "ulid"; import { bot } from "../bot/mod.ts"; import { PngInfo } from "../bot/parsePngInfo.ts"; @@ -46,7 +46,7 @@ export const activeGenerationWorkers = new Map>(); /** * Initializes queue workers for each SD instance when they become online. */ -export async function processGenerationQueue() { +export async function processGenerationQueue(): Promise { while (true) { for await (const workerInstance of workerInstanceStore.listAll()) { const activeWorker = activeGenerationWorkers.get(workerInstance.id); @@ -142,15 +142,6 @@ async function processGenerationJob( debug(`Generation started for ${formatUserChat(state)}`); await updateJob({ state: state }); - // check if bot can post messages in this chat - const chat = await bot.api.getChat(state.chat.id); - if ( - (chat.type === "group" || chat.type === "supergroup") && - (!chat.permissions?.can_send_messages || !chat.permissions?.can_send_photos) - ) { - throw new Error("Bot doesn't have permissions to send photos in this chat"); - } - // edit the existing status message await bot.api.editMessageText( state.replyMessage.chat.id, @@ -312,34 +303,25 @@ async function processGenerationJob( /** * Handles queue updates and updates the status message. */ -export async function updateGenerationQueue() { +export async function updateGenerationQueue(): Promise { while (true) { const jobs = await generationQueue.getAllJobs(); - const editedMessages = await Promise.all(jobs.map(async (job) => { + await Promise.all(jobs.map(async (job) => { if (job.status === "processing") { // if the job is processing, the worker will update its status message return; } - // spread the updates in time randomly - await delay(Math.random() * 3_000); - return await bot.api.editMessageText( job.state.replyMessage.chat.id, job.state.replyMessage.message_id, `You are ${formatOrdinal(job.place)} in queue.`, { maxAttempts: 1 }, - ).then(() => true).catch(() => false); + ).catch(() => {}); })); - const erroredMessages = editedMessages.filter((ok) => !ok); - if (erroredMessages.length > 0) { - warning( - `Updating queue status failed for ${erroredMessages.length} / ${editedMessages.length} jobs`, - ); - } - - await delay(10_000); + // delay between updates based on the number of jobs + await delay(Math.min(15_000, Math.max(3_000, jobs.length * 100))); } }