From 1c55ae70af672a1dbd2a54056dccb3288cfd5f42 Mon Sep 17 00:00:00 2001 From: pinks Date: Wed, 6 Sep 2023 19:51:52 +0200 Subject: [PATCH] feat: persistent queue --- bot.ts | 89 +++++++++++++++++-------- deps.ts | 2 + main.ts | 3 +- queue.ts | 199 +++++++++++++++++++++++++++++++++++-------------------- store.ts | 76 +++++++++++++++++++++ 5 files changed, 270 insertions(+), 99 deletions(-) create mode 100644 store.ts diff --git a/bot.ts b/bot.ts index a63fda3..7582a89 100644 --- a/bot.ts +++ b/bot.ts @@ -1,6 +1,6 @@ import { autoQuote, bold, Bot, Context, hydrateReply, ParseModeFlavor } from "./deps.ts"; -import { fmt, formatOrdinal } from "./intl.ts"; -import { queue } from "./queue.ts"; +import { fmt } from "./intl.ts"; +import { getAllJobs, pushJob } from "./queue.ts"; import { mySession, MySessionFlavor } from "./session.ts"; export type MyContext = ParseModeFlavor & MySessionFlavor; @@ -22,6 +22,21 @@ bot.api.config.use(async (prev, method, payload, signal) => { } }); +// if error happened, try to reply to the user with the error +bot.use(async (ctx, next) => { + try { + await next(); + } catch (err) { + try { + await ctx.reply(`Handling update failed: ${err}`, { + reply_to_message_id: ctx.message?.message_id, + }); + } catch { + throw err; + } + } +}); + bot.api.setMyShortDescription("I can generate furry images from text"); bot.api.setMyDescription( "I can generate furry images from text. Send /txt2img to generate an image.", @@ -42,12 +57,13 @@ bot.command("txt2img", async (ctx) => { if (config.pausedReason != null) { return ctx.reply(`I'm paused: ${config.pausedReason || "No reason given"}`); } - if (queue.length >= config.maxJobs) { + const jobs = await getAllJobs(); + if (jobs.length >= config.maxJobs) { return ctx.reply( `The queue is full. Try again later. (Max queue size: ${config.maxJobs})`, ); } - const jobCount = queue.filter((job) => job.userId === ctx.from.id).length; + const jobCount = jobs.filter((job) => job.user.id === ctx.from.id).length; if (jobCount >= config.maxUserJobs) { return ctx.reply( `You already have ${config.maxUserJobs} jobs in queue. Try again later.`, @@ -56,33 +72,50 @@ bot.command("txt2img", async (ctx) => { if (!ctx.match) { return ctx.reply("Please describe what you want to see after the command"); } - const place = queue.length + 1; - const queueMessage = await ctx.reply(`You are ${formatOrdinal(place)} in queue.`); - const userName = [ctx.from.first_name, ctx.from.last_name].filter(Boolean).join(" "); - const chatName = ctx.chat.type === "supergroup" || ctx.chat.type === "group" - ? ctx.chat.title - : "private chat"; - queue.push({ + pushJob({ params: { prompt: ctx.match }, - userId: ctx.from.id, - userName, - chatId: ctx.chat.id, - chatName, - requestMessageId: ctx.message.message_id, - statusMessageId: queueMessage.message_id, + user: ctx.from, + chat: ctx.chat, + requestMessage: ctx.message, + status: { type: "idle" }, }); - console.log(`Enqueued job for ${userName} in chat ${chatName}`); + console.log( + `Enqueued job ${jobs.length + 1} for ${ctx.from.first_name} in ${ctx.chat.type} chat:`, + ctx.match.replace(/\s+/g, " "), + "\n", + ); }); -bot.command("queue", (ctx) => { - if (queue.length === 0) return ctx.reply("Queue is empty"); - return ctx.replyFmt( - fmt`Current queue:\n\n${ - queue.map((job, index) => - fmt`${bold(index + 1)}. ${bold(job.userName)} in ${bold(job.chatName)}\n` +bot.command("queue", async (ctx) => { + let jobs = await getAllJobs(); + const getMessageText = () => { + if (jobs.length === 0) return fmt`Queue is empty.`; + const sortedJobs = []; + let place = 0; + for (const job of jobs) { + if (job.status.type === "idle") place += 1; + sortedJobs.push({ ...job, place }); + } + return fmt`Current queue:\n\n${ + sortedJobs.map((job) => + fmt`${job.place}. ${bold(job.user.first_name)} in ${job.chat.type} chat ${ + job.status.type === "processing" ? `(${(job.status.progress * 100).toFixed(0)}%)` : "" + }\n` ) - }`, - ); + }`; + }; + const message = await ctx.replyFmt(getMessageText()); + handleFutureUpdates(); + async function handleFutureUpdates() { + for (let idx = 0; idx < 12; idx++) { + await new Promise((resolve) => setTimeout(resolve, 5000)); + jobs = await getAllJobs(); + const formattedMessage = getMessageText(); + await ctx.api.editMessageText(ctx.chat.id, message.message_id, formattedMessage.text, { + entities: formattedMessage.entities, + }).catch(() => undefined); + } + } }); bot.command("pause", (ctx) => { @@ -221,6 +254,10 @@ bot.command("sdparams", (ctx) => { ); }); +bot.command("crash", () => { + throw new Error("Crash command used"); +}); + bot.catch((err) => { let msg = "Error processing update"; const { from, chat } = err.ctx; diff --git a/deps.ts b/deps.ts index 2522297..749ff5c 100644 --- a/deps.ts +++ b/deps.ts @@ -2,3 +2,5 @@ export * from "https://deno.land/x/grammy@v1.18.1/mod.ts"; export * from "https://deno.land/x/grammy_autoquote@v1.1.2/mod.ts"; export * from "https://deno.land/x/grammy_parse_mode@1.7.1/mod.ts"; export * from "https://deno.land/x/grammy_storages@v2.3.1/denokv/src/mod.ts"; +export * as types from "https://deno.land/x/grammy_types@v3.2.0/mod.ts"; +export * from "https://deno.land/x/ulid@v0.3.0/mod.ts"; diff --git a/main.ts b/main.ts index 623e567..c3827e7 100644 --- a/main.ts +++ b/main.ts @@ -1,8 +1,9 @@ import "https://deno.land/std@0.201.0/dotenv/load.ts"; import { bot } from "./bot.ts"; -import { processQueue } from "./queue.ts"; +import { processQueue, returnHangedJobs } from "./queue.ts"; await Promise.all([ bot.start(), processQueue(), + returnHangedJobs(), ]); diff --git a/queue.ts b/queue.ts index bbb0621..bd514d6 100644 --- a/queue.ts +++ b/queue.ts @@ -1,112 +1,167 @@ -import { InputFile, InputMediaBuilder } from "./deps.ts"; +import { InputFile, InputMediaBuilder, types } from "./deps.ts"; import { bot } from "./bot.ts"; import { getGlobalSession } from "./session.ts"; import { formatOrdinal } from "./intl.ts"; -import { SdProgressResponse, SdRequest, txt2img } from "./sd.ts"; +import { SdRequest, txt2img } from "./sd.ts"; import { extFromMimeType, mimeTypeFromBase64 } from "./mimeType.ts"; - -export const queue: Job[] = []; +import { Model, Store } from "./store.ts"; interface Job { params: Partial; - userId: number; - userName: string; - chatId: number; - chatName: string; - requestMessageId: number; - statusMessageId: number; + user: types.User; + chat: types.Chat.PrivateChat | types.Chat.GroupChat | types.Chat.SupergroupChat; + requestMessage: types.Message & types.Message.TextMessage; + statusMessage?: types.Message & types.Message.TextMessage; + status: { type: "idle" } | { type: "processing"; progress: number; updatedDate: Date }; +} + +const db = await Deno.openKv("./app.db"); + +const jobStore = new Store(db, "job"); + +export async function pushJob(job: Job) { + await jobStore.create(job); +} + +async function takeJob(): Promise | null> { + const jobs = await jobStore.list(); + const job = jobs.find((job) => job.value.status.type === "idle"); + if (!job) return null; + await job.update({ status: { type: "processing", progress: 0, updatedDate: new Date() } }); + return job; +} + +export async function getAllJobs(): Promise> { + return await jobStore.list().then((jobs) => jobs.map((job) => job.value)); } export async function processQueue() { while (true) { - const job = queue.shift(); + const job = await takeJob(); if (!job) { await new Promise((resolve) => setTimeout(resolve, 1000)); continue; } - for (const [index, job] of queue.entries()) { - const place = index + 1; - await bot.api - .editMessageText( - job.chatId, - job.statusMessageId, - `You are ${formatOrdinal(place)} in queue.`, + let place = 0; + for (const job of await jobStore.list()) { + if (job.value.status.type === "idle") place += 1; + if (place === 0) continue; + const statusMessageText = `You are ${formatOrdinal(place)} in queue.`; + if (!job.value.statusMessage) { + await bot.api.sendMessage(job.value.chat.id, statusMessageText, { + reply_to_message_id: job.value.requestMessage.message_id, + }).catch(() => undefined) + .then((message) => job.update({ statusMessage: message })); + } else { + await bot.api.editMessageText( + job.value.chat.id, + job.value.statusMessage.message_id, + statusMessageText, ) - .catch(() => {}); + .catch(() => undefined); + } } try { - await bot.api - .deleteMessage(job.chatId, job.statusMessageId) - .catch(() => {}); - const progressMessage = await bot.api.sendMessage( - job.chatId, + if (job.value.statusMessage) { + await bot.api + .deleteMessage(job.value.chat.id, job.value.statusMessage?.message_id) + .catch(() => undefined) + .then(() => job.update({ statusMessage: undefined })); + } + await bot.api.sendMessage( + job.value.chat.id, "Generating your prompt now...", - { reply_to_message_id: job.requestMessageId }, - ); - const onProgress = (progress: SdProgressResponse) => { - bot.api - .editMessageText( - job.chatId, - progressMessage.message_id, - `Generating your prompt now... ${ - Math.round( - progress.progress * 100, - ) - }%`, - ) - .catch(() => {}); - }; + { reply_to_message_id: job.value.requestMessage.message_id }, + ).then((message) => job.update({ statusMessage: message })); const config = await getGlobalSession(); const response = await txt2img( config.sdApiUrl, - { ...config.defaultParams, ...job.params }, - onProgress, + { ...config.defaultParams, ...job.value.params }, + (progress) => { + job.update({ + status: { type: "processing", progress: progress.progress, updatedDate: new Date() }, + }); + if (job.value.statusMessage) { + bot.api + .editMessageText( + job.value.chat.id, + job.value.statusMessage.message_id, + `Generating your prompt now... ${ + Math.round( + progress.progress * 100, + ) + }%`, + ) + .catch(() => undefined); + } + }, ); console.log( - `Generated ${response.images.length} images (${ - response.images - .map((image) => (image.length / 1024).toFixed(0) + "kB") - .join(", ") - }) for ${job.userName} in ${job.chatName}: ${job.params.prompt?.replace(/\s+/g, " ")}`, + `Finished job for ${job.value.user.first_name} in ${job.value.chat.type} chat`, ); - await bot.api.editMessageText( - job.chatId, - progressMessage.message_id, - `Uploading your images...`, - ).catch(() => {}); + if (job.value.statusMessage) { + await bot.api.editMessageText( + job.value.chat.id, + job.value.statusMessage.message_id, + `Uploading your images...`, + ).catch(() => undefined); + } const inputFiles = await Promise.all( response.images.map(async (imageBase64, idx) => { const mimeType = mimeTypeFromBase64(imageBase64); - const imageBlob = await fetch(`data:${mimeType};base64,${imageBase64}`).then((resp) => - resp.blob() - ); - console.log( - `Uploading image ${idx + 1} of ${response.images.length} (${ - (imageBlob.size / 1024).toFixed(0) - }kB)`, - ); + const imageBlob = await fetch(`data:${mimeType};base64,${imageBase64}`) + .then((resp) => resp.blob()); return InputMediaBuilder.photo( - new InputFile(imageBlob, `${idx}.${extFromMimeType(mimeType)}`), + new InputFile(imageBlob, `image_${idx}.${extFromMimeType(mimeType)}`), ); }), ); - await bot.api.sendMediaGroup(job.chatId, inputFiles, { - reply_to_message_id: job.requestMessageId, + if (job.value.statusMessage) { + await bot.api + .deleteMessage(job.value.chat.id, job.value.statusMessage.message_id) + .catch(() => undefined).then(() => job.update({ statusMessage: undefined })); + } + await bot.api.sendMediaGroup(job.value.chat.id, inputFiles, { + reply_to_message_id: job.value.requestMessage.message_id, }); - await bot.api - .deleteMessage(job.chatId, progressMessage.message_id) - .catch(() => {}); - console.log(`${queue.length} jobs remaining`); + await job.delete(); } catch (err) { console.error( - `Failed to generate image for ${job.userName} in ${job.chatName}: ${job.params.prompt} - ${err}`, + `Failed to generate an image for ${job.value.user.first_name} in ${job.value.chat.type} chat: ${err}`, ); - await bot.api - .sendMessage(job.chatId, err.toString(), { - reply_to_message_id: job.requestMessageId, + const errorMessage = await bot.api + .sendMessage(job.value.chat.id, err.toString(), { + reply_to_message_id: job.value.requestMessage.message_id, }) - .catch(() => bot.api.sendMessage(job.chatId, err.toString())) - .catch(() => {}); + .catch(() => undefined); + if (errorMessage) { + if (job.value.statusMessage) { + await bot.api + .deleteMessage(job.value.chat.id, job.value.statusMessage.message_id) + .catch(() => undefined) + .then(() => job.update({ statusMessage: undefined })); + } + job.update({ status: { type: "idle" } }); + } else { + await job.delete(); + } + } + } +} + +export async function returnHangedJobs() { + while (true) { + await new Promise((resolve) => setTimeout(resolve, 5000)); + const jobs = await jobStore.list(); + for (const job of jobs) { + if (job.value.status.type === "idle") continue; + // if job wasn't updated for 1 minute, return it to the queue + if (job.value.status.updatedDate.getTime() < Date.now() - 60 * 1000) { + console.log( + `Returning hanged job for ${job.value.user.first_name} in ${job.value.chat.type} chat`, + ); + await job.update({ status: { type: "idle" } }); + } } } } diff --git a/store.ts b/store.ts new file mode 100644 index 0000000..2ac7765 --- /dev/null +++ b/store.ts @@ -0,0 +1,76 @@ +import { ulid } from "./deps.ts"; + +export class Store { + constructor( + private readonly db: Deno.Kv, + private readonly storeKey: Deno.KvKeyPart, + ) { + } + + async create(value: T): Promise> { + const id = ulid(); + await this.db.set([this.storeKey, id], value); + return new Model(this.db, this.storeKey, id, value); + } + + async get(id: Deno.KvKeyPart): Promise | null> { + const entry = await this.db.get([this.storeKey, id]); + if (entry.versionstamp == null) return null; + return new Model(this.db, this.storeKey, id, entry.value); + } + + async list(): Promise>> { + const models: Array> = []; + for await (const entry of this.db.list({ prefix: [this.storeKey] })) { + models.push(new Model(this.db, this.storeKey, entry.key[1], entry.value)); + } + return models; + } +} + +export class Model { + #value: T; + + constructor( + private readonly db: Deno.Kv, + private readonly storeKey: Deno.KvKeyPart, + private readonly entryKey: Deno.KvKeyPart, + value: T, + ) { + this.#value = value; + } + + get value(): T { + return this.#value; + } + + async get(): Promise { + const entry = await this.db.get([this.storeKey, this.entryKey]); + if (entry.versionstamp == null) return null; + this.#value = entry.value; + return entry.value; + } + + async set(value: T): Promise { + await this.db.set([this.storeKey, this.entryKey], value); + this.#value = value; + return value; + } + + async update(value: Partial | ((value: T) => T)): Promise { + const entry = await this.db.get([this.storeKey, this.entryKey]); + if (entry.versionstamp == null) return null; + if (typeof value === "function") { + entry.value = value(entry.value); + } else { + entry.value = { ...entry.value, ...value }; + } + await this.db.set([this.storeKey, this.entryKey], entry.value); + this.#value = entry.value; + return entry.value; + } + + async delete(): Promise { + await this.db.delete([this.storeKey, this.entryKey]); + } +}