perf: faster queue status updates
This commit is contained in:
parent
5b6a1a3471
commit
03b0c2bc89
|
@ -4,7 +4,7 @@ import { JobData, Queue, Worker } from "kvmq";
|
||||||
import createOpenApiClient from "openapi_fetch";
|
import createOpenApiClient from "openapi_fetch";
|
||||||
import { delay } from "std/async/delay.ts";
|
import { delay } from "std/async/delay.ts";
|
||||||
import { decode, encode } from "std/encoding/base64.ts";
|
import { decode, encode } from "std/encoding/base64.ts";
|
||||||
import { debug, error, info, warning } from "std/log/mod.ts";
|
import { debug, error, info } from "std/log/mod.ts";
|
||||||
import { ulid } from "ulid";
|
import { ulid } from "ulid";
|
||||||
import { bot } from "../bot/mod.ts";
|
import { bot } from "../bot/mod.ts";
|
||||||
import { PngInfo } from "../bot/parsePngInfo.ts";
|
import { PngInfo } from "../bot/parsePngInfo.ts";
|
||||||
|
@ -46,7 +46,7 @@ export const activeGenerationWorkers = new Map<string, Worker<GenerationJob>>();
|
||||||
/**
|
/**
|
||||||
* Initializes queue workers for each SD instance when they become online.
|
* Initializes queue workers for each SD instance when they become online.
|
||||||
*/
|
*/
|
||||||
export async function processGenerationQueue() {
|
export async function processGenerationQueue(): Promise<never> {
|
||||||
while (true) {
|
while (true) {
|
||||||
for await (const workerInstance of workerInstanceStore.listAll()) {
|
for await (const workerInstance of workerInstanceStore.listAll()) {
|
||||||
const activeWorker = activeGenerationWorkers.get(workerInstance.id);
|
const activeWorker = activeGenerationWorkers.get(workerInstance.id);
|
||||||
|
@ -142,15 +142,6 @@ async function processGenerationJob(
|
||||||
debug(`Generation started for ${formatUserChat(state)}`);
|
debug(`Generation started for ${formatUserChat(state)}`);
|
||||||
await updateJob({ state: state });
|
await updateJob({ state: state });
|
||||||
|
|
||||||
// check if bot can post messages in this chat
|
|
||||||
const chat = await bot.api.getChat(state.chat.id);
|
|
||||||
if (
|
|
||||||
(chat.type === "group" || chat.type === "supergroup") &&
|
|
||||||
(!chat.permissions?.can_send_messages || !chat.permissions?.can_send_photos)
|
|
||||||
) {
|
|
||||||
throw new Error("Bot doesn't have permissions to send photos in this chat");
|
|
||||||
}
|
|
||||||
|
|
||||||
// edit the existing status message
|
// edit the existing status message
|
||||||
await bot.api.editMessageText(
|
await bot.api.editMessageText(
|
||||||
state.replyMessage.chat.id,
|
state.replyMessage.chat.id,
|
||||||
|
@ -312,34 +303,25 @@ async function processGenerationJob(
|
||||||
/**
|
/**
|
||||||
* Handles queue updates and updates the status message.
|
* Handles queue updates and updates the status message.
|
||||||
*/
|
*/
|
||||||
export async function updateGenerationQueue() {
|
export async function updateGenerationQueue(): Promise<never> {
|
||||||
while (true) {
|
while (true) {
|
||||||
const jobs = await generationQueue.getAllJobs();
|
const jobs = await generationQueue.getAllJobs();
|
||||||
|
|
||||||
const editedMessages = await Promise.all(jobs.map(async (job) => {
|
await Promise.all(jobs.map(async (job) => {
|
||||||
if (job.status === "processing") {
|
if (job.status === "processing") {
|
||||||
// if the job is processing, the worker will update its status message
|
// if the job is processing, the worker will update its status message
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// spread the updates in time randomly
|
|
||||||
await delay(Math.random() * 3_000);
|
|
||||||
|
|
||||||
return await bot.api.editMessageText(
|
return await bot.api.editMessageText(
|
||||||
job.state.replyMessage.chat.id,
|
job.state.replyMessage.chat.id,
|
||||||
job.state.replyMessage.message_id,
|
job.state.replyMessage.message_id,
|
||||||
`You are ${formatOrdinal(job.place)} in queue.`,
|
`You are ${formatOrdinal(job.place)} in queue.`,
|
||||||
{ maxAttempts: 1 },
|
{ maxAttempts: 1 },
|
||||||
).then(() => true).catch(() => false);
|
).catch(() => {});
|
||||||
}));
|
}));
|
||||||
|
|
||||||
const erroredMessages = editedMessages.filter((ok) => !ok);
|
// delay between updates based on the number of jobs
|
||||||
if (erroredMessages.length > 0) {
|
await delay(Math.min(15_000, Math.max(3_000, jobs.length * 100)));
|
||||||
warning(
|
|
||||||
`Updating queue status failed for ${erroredMessages.length} / ${editedMessages.length} jobs`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
await delay(10_000);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue