From aa380c63fb467f32afae945e7f45ece111f96a03 Mon Sep 17 00:00:00 2001 From: pinks Date: Sun, 24 Sep 2023 14:05:28 +0200 Subject: [PATCH] perf: split generation and upload queues --- app/db.ts | 3 + app/generationQueue.ts | 320 ++++++++++++++-------------------------- app/generationStore.ts | 37 ++--- app/mod.ts | 8 +- app/uploadQueue.ts | 128 ++++++++++++++++ bot/img2imgCommand.ts | 4 +- bot/mod.ts | 20 ++- bot/queueCommand.ts | 4 +- bot/txt2imgCommand.ts | 4 +- deps.ts | 6 +- main.ts | 9 +- utils/formatUserChat.ts | 7 +- 12 files changed, 291 insertions(+), 259 deletions(-) create mode 100644 app/uploadQueue.ts diff --git a/app/db.ts b/app/db.ts index 032772a..880f04b 100644 --- a/app/db.ts +++ b/app/db.ts @@ -1 +1,4 @@ +import { KVFS } from "../deps.ts"; + export const db = await Deno.openKv("./app.db"); +export const fs = new KVFS.KvFs(db); diff --git a/app/generationQueue.ts b/app/generationQueue.ts index 9390699..7a5ff71 100644 --- a/app/generationQueue.ts +++ b/app/generationQueue.ts @@ -3,23 +3,21 @@ import { PngInfo } from "../sd/parsePngInfo.ts"; import * as SdApi from "../sd/sdApi.ts"; import { formatUserChat } from "../utils/formatUserChat.ts"; import { getConfig, SdInstanceData } from "./config.ts"; -import { db } from "./db.ts"; -import { generationStore, SdGenerationInfo } from "./generationStore.ts"; +import { db, fs } from "./db.ts"; +import { SdGenerationInfo } from "./generationStore.ts"; import { Async, AsyncX, Base64, createOpenApiClient, - FileType, - FmtDuration, - Grammy, - GrammyParseMode, GrammyTypes, KVMQ, Log, + ULID, } from "../deps.ts"; import { formatOrdinal } from "../utils/formatOrdinal.ts"; import { SdError } from "../sd/SdError.ts"; +import { uploadQueue } from "./uploadQueue.ts"; const logger = () => Log.getLogger(); @@ -37,7 +35,7 @@ interface GenerationJob { from: GrammyTypes.User; chat: GrammyTypes.Chat; requestMessage: GrammyTypes.Message; - replyMessage?: GrammyTypes.Message; + replyMessage: GrammyTypes.Message; sdInstanceId?: string; progress?: number; } @@ -47,9 +45,9 @@ export const generationQueue = new KVMQ.Queue(db, "jobQueue"); export const activeGenerationWorkers = new Map>(); /** - * Periodically restarts stable diffusion generation workers if they become online. + * Initializes queue workers for each SD instance when they become online. */ -export async function restartGenerationWorkers() { +export async function processGenerationQueue() { while (true) { const config = await getConfig(); @@ -57,14 +55,13 @@ export async function restartGenerationWorkers() { const activeWorker = activeGenerationWorkers.get(sdInstance.id); if (activeWorker?.isProcessing) continue; - const activeWorkerSdClient = createOpenApiClient({ + const workerSdClient = createOpenApiClient({ baseUrl: sdInstance.api.url, headers: { "Authorization": sdInstance.api.auth }, }); // check if worker is up - - const activeWorkerStatus = await activeWorkerSdClient.GET("/sdapi/v1/memory", { + const activeWorkerStatus = await workerSdClient.GET("/sdapi/v1/memory", { signal: AbortSignal.timeout(10_000), }) .then((response) => { @@ -76,102 +73,79 @@ export async function restartGenerationWorkers() { .catch((error) => { logger().warning(`Worker ${sdInstance.id} is down: ${error}`); }); - if (!activeWorkerStatus?.data) { continue; } - const newWorker = generationQueue.createWorker(({ state, setState }) => - processGenerationJob(state, setState, sdInstance) - ); - - logger().info(`Started worker ${sdInstance.id}`); - - newWorker.processJobs(); - + // create worker + const newWorker = generationQueue.createWorker(async ({ state }, updateJob) => { + await processGenerationJob(state, updateJob, sdInstance); + }); newWorker.addEventListener("error", (e) => { - logger().error(`Job failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`); + logger().error( + `Generation failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`, + ); bot.api.sendMessage( e.detail.job.state.requestMessage.chat.id, - `Generating failed: ${e.detail.error}`, + `Generation failed: ${e.detail.error}\n\n` + + (e.detail.job.retryCount > 0 + ? `Will retry ${e.detail.job.retryCount} more times.` + : `Aborting.`), { reply_to_message_id: e.detail.job.state.requestMessage.message_id, + allow_sending_without_reply: true, }, ).catch(() => undefined); - // TODO: only stop worker if error is network error - newWorker.stopProcessing(); + if (e.detail.error instanceof SdError) { + newWorker.stopProcessing(); + } }); - + newWorker.processJobs(); activeGenerationWorkers.set(sdInstance.id, newWorker); + logger().info(`Started worker ${sdInstance.id}`); } await Async.delay(60_000); } } +/** + * Processes a single job from the queue. + */ async function processGenerationJob( - job: GenerationJob, - setJob: (state: GenerationJob) => Promise, + state: GenerationJob, + updateJob: (job: Partial>) => Promise, sdInstance: SdInstanceData, ) { - logger().debug(`Job started for ${formatUserChat(job)} using ${sdInstance.id}`); const startDate = new Date(); - job.sdInstanceId = sdInstance.id; - await setJob(job); - - const config = await getConfig(); const workerSdClient = createOpenApiClient({ baseUrl: sdInstance.api.url, headers: { "Authorization": sdInstance.api.auth }, }); + state.sdInstanceId = sdInstance.id; + logger().debug(`Generation started for ${formatUserChat(state)}`); + await updateJob({ state: state }); - // if there is already a status message and its older than 30 seconds - if (job.replyMessage && (Date.now() - job.replyMessage.date * 1000) > 30_000) { - // try to delete it - await bot.api.deleteMessage(job.replyMessage.chat.id, job.replyMessage.message_id) - .catch(() => undefined); - job.replyMessage = undefined; - await setJob(job); + // check if bot can post messages in this chat + const chat = await bot.api.getChat(state.chat.id); + if ( + (chat.type === "group" || chat.type === "supergroup") && + (!chat.permissions?.can_send_messages || !chat.permissions?.can_send_photos) + ) { + throw new Error("Bot doesn't have permissions to send photos in this chat"); } - await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 }) - .catch(() => undefined); - - // if now there is no status message - if (!job.replyMessage) { - // send a new status message - job.replyMessage = await bot.api.sendMessage( - job.chat.id, - `Generating your prompt now... 0% using ${sdInstance.name}`, - { reply_to_message_id: job.requestMessage.message_id }, - ).catch((err) => { - // if the request message (the message we are replying to) was deleted - if (err instanceof Grammy.GrammyError && err.message.match(/repl(y|ied)/)) { - // set the status message to undefined - return undefined; - } - throw err; - }); - await setJob(job); - } else { - // edit the existing status message - await bot.api.editMessageText( - job.replyMessage.chat.id, - job.replyMessage.message_id, - `Generating your prompt now... 0% using ${sdInstance.name}`, - { maxAttempts: 1 }, - ).catch(() => undefined); - } - - // if we don't have a status message (it failed sending because request was deleted) - if (!job.replyMessage) { - // cancel the job - logger().info(`Job cancelled for ${formatUserChat(job)}`); - return; - } + // edit the existing status message + await bot.api.editMessageText( + state.replyMessage.chat.id, + state.replyMessage.message_id, + `Generating your prompt now... 0% using ${sdInstance.name}`, + { maxAttempts: 1 }, + ); // reduce size if worker can't handle the resolution + const config = await getConfig(); const size = limitSize( - { ...config.defaultParams, ...job.task.params }, + { ...config.defaultParams, ...state.task.params }, sdInstance.maxResolution, ); function limitSize( @@ -190,31 +164,31 @@ async function processGenerationJob( } // start generating the image - const responsePromise = job.task.type === "txt2img" + const responsePromise = state.task.type === "txt2img" ? workerSdClient.POST("/sdapi/v1/txt2img", { body: { ...config.defaultParams, - ...job.task.params, + ...state.task.params, ...size, - negative_prompt: job.task.params.negative_prompt - ? job.task.params.negative_prompt + negative_prompt: state.task.params.negative_prompt + ? state.task.params.negative_prompt : config.defaultParams?.negative_prompt, }, }) - : job.task.type === "img2img" + : state.task.type === "img2img" ? workerSdClient.POST("/sdapi/v1/img2img", { body: { ...config.defaultParams, - ...job.task.params, + ...state.task.params, ...size, - negative_prompt: job.task.params.negative_prompt - ? job.task.params.negative_prompt + negative_prompt: state.task.params.negative_prompt + ? state.task.params.negative_prompt : config.defaultParams?.negative_prompt, init_images: [ Base64.encode( await fetch( `https://api.telegram.org/file/bot${bot.token}/${await bot.api.getFile( - job.task.fileId, + state.task.fileId, ).then((file) => file.file_path)}`, ).then((resp) => resp.arrayBuffer()), ), @@ -224,44 +198,45 @@ async function processGenerationJob( : undefined; if (!responsePromise) { - throw new Error(`Unknown task type: ${job.task.type}`); + throw new Error(`Unknown task type: ${state.task.type}`); } // poll for progress while the generation request is pending - while (await AsyncX.promiseState(responsePromise) === "pending") { - await Async.delay(3000); + do { const progressResponse = await workerSdClient.GET("/sdapi/v1/progress", { params: {}, - signal: AbortSignal.timeout(15_000), + signal: AbortSignal.timeout(15000), }); if (!progressResponse.data) { throw new SdError( - "Failed to get progress", + "Progress request failed", progressResponse.response, progressResponse.error, ); } - job.progress = progressResponse.data.progress; - await setJob(job); - await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 }) + + state.progress = progressResponse.data.progress; + await updateJob({ state: state }); + + await bot.api.sendChatAction(state.chat.id, "upload_photo", { maxAttempts: 1 }) .catch(() => undefined); - if (job.replyMessage) { - await bot.api.editMessageText( - job.replyMessage.chat.id, - job.replyMessage.message_id, - `Generating your prompt now... ${ - (progressResponse.data.progress * 100).toFixed(0) - }% using ${sdInstance.name}`, - { maxAttempts: 1 }, - ).catch(() => undefined); - } - } + await bot.api.editMessageText( + state.replyMessage.chat.id, + state.replyMessage.message_id, + `Generating your prompt now... ${ + (progressResponse.data.progress * 100).toFixed(0) + }% using ${sdInstance.name}`, + { maxAttempts: 1 }, + ).catch(() => undefined); + + await Promise.race([Async.delay(3000), responsePromise]).catch(() => undefined); + } while (await AsyncX.promiseState(responsePromise) === "pending"); + + // check response const response = await responsePromise; - if (!response.data) { - throw new SdError("Generating image failed", response.response, response.error); + throw new SdError(`${state.task.type} failed`, response.response, response.error); } - if (!response.data.images?.length) { throw new Error("No images returned from SD"); } @@ -269,120 +244,43 @@ async function processGenerationJob( // info field is a json serialized string so we need to parse it const info: SdGenerationInfo = JSON.parse(response.data.info); + // save images to db + const imageKeys: Deno.KvKey[] = []; + for (const imageBase64 of response.data.images) { + const imageBuffer = Base64.decode(imageBase64); + const imageKey = ["images", "upload", ULID.ulid()]; + await fs.set(imageKey, imageBuffer, { expireIn: 30 * 60 * 1000 }); + imageKeys.push(imageKey); + } + + // create a new upload job + await uploadQueue.pushJob({ + chat: state.chat, + from: state.from, + requestMessage: state.requestMessage, + replyMessage: state.replyMessage, + sdInstanceId: sdInstance.id, + startDate, + endDate: new Date(), + imageKeys, + info, + }, { retryCount: 5, retryDelayMs: 10000 }); + // change status message to uploading images await bot.api.editMessageText( - job.replyMessage.chat.id, - job.replyMessage.message_id, + state.replyMessage.chat.id, + state.replyMessage.message_id, `Uploading your images...`, { maxAttempts: 1 }, ).catch(() => undefined); - // render the caption - // const detailedReply = Object.keys(job.value.params).filter((key) => key !== "prompt").length > 0; - const detailedReply = true; - const jobDurationMs = Math.trunc((Date.now() - startDate.getTime()) / 1000) * 1000; - const { bold, fmt } = GrammyParseMode; - const caption = fmt([ - `${info.prompt}\n`, - ...detailedReply - ? [ - info.negative_prompt ? fmt`${bold("Negative prompt:")} ${info.negative_prompt}\n` : "", - fmt`${bold("Steps:")} ${info.steps}, `, - fmt`${bold("Sampler:")} ${info.sampler_name}, `, - fmt`${bold("CFG scale:")} ${info.cfg_scale}, `, - fmt`${bold("Seed:")} ${info.seed}, `, - fmt`${bold("Size")}: ${info.width}x${info.height}, `, - fmt`${bold("Worker")}: ${sdInstance.id}, `, - fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`, - ] - : [], - ]); - - // sending images loop because telegram is unreliable and it would be a shame to lose the images - // TODO: separate queue for sending images - let sendMediaAttempt = 0; - let resultMessages: GrammyTypes.Message.MediaMessage[] | undefined; - while (true) { - sendMediaAttempt++; - await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 }) - .catch(() => undefined); - - // parse files from reply JSON - const inputFiles = await Promise.all( - response.data.images.map(async (imageBase64, idx) => { - const imageBuffer = Base64.decode(imageBase64); - const imageType = await FileType.fileTypeFromBuffer(imageBuffer); - if (!imageType) throw new Error("Unknown file type returned from worker"); - return Grammy.InputMediaBuilder.photo( - new Grammy.InputFile(imageBuffer, `image${idx}.${imageType.ext}`), - // if it can fit, add caption for first photo - idx === 0 && caption.text.length <= 1024 - ? { caption: caption.text, caption_entities: caption.entities } - : undefined, - ); - }), - ); - - // send the result to telegram - try { - resultMessages = await bot.api.sendMediaGroup(job.chat.id, inputFiles, { - reply_to_message_id: job.requestMessage.message_id, - maxAttempts: 5, - }); - break; - } catch (err) { - logger().warning( - `Sending images (attempt ${sendMediaAttempt}) for ${ - formatUserChat(job) - } using ${sdInstance.id} failed: ${err}`, - ); - if (sendMediaAttempt >= 6) throw err; - // wait 2 * 5 seconds before retrying - for (let i = 0; i < 2; i++) { - await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 }) - .catch(() => undefined); - await Async.delay(5000); - } - } - } - - // send caption in separate message if it couldn't fit - if (caption.text.length > 1024 && caption.text.length <= 4096) { - await bot.api.sendMessage(job.chat.id, caption.text, { - reply_to_message_id: resultMessages[0].message_id, - entities: caption.entities, - }); - } - - // delete the status message - await bot.api.deleteMessage(job.replyMessage.chat.id, job.replyMessage.message_id) - .catch(() => undefined); - job.replyMessage = undefined; - await setJob(job); - - // save to generation storage - generationStore.create({ - task: { type: job.task.type, params: job.task.params }, - from: job.from, - chat: job.chat, - status: { - startDate, - endDate: new Date(), - info: info, - }, - }); - - logger().debug( - `Job finished for ${formatUserChat(job)} using ${sdInstance.id}${ - sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : "" - }`, - ); + logger().debug(`Generation finished for ${formatUserChat(state)}`); } /** - * Updates the status message of all jobs in the queue. + * Handles queue updates and updates the status message. */ -export async function handleGenerationUpdates() { +export async function updateGenerationQueue() { while (true) { const jobs = await generationQueue.getAllJobs(); let index = 0; diff --git a/app/generationStore.ts b/app/generationStore.ts index d61eabc..fce3234 100644 --- a/app/generationStore.ts +++ b/app/generationStore.ts @@ -1,26 +1,13 @@ import { GrammyTypes, IKV } from "../deps.ts"; -import { PngInfo } from "../sd/parsePngInfo.ts"; import { db } from "./db.ts"; export interface GenerationSchema { - task: - | { - type: "txt2img"; - params: Partial; - } - | { - type: "img2img"; - params: Partial; - fileId?: string; - }; from: GrammyTypes.User; chat: GrammyTypes.Chat; - requestMessageId?: number; - status: { - info?: SdGenerationInfo; - startDate?: Date; - endDate?: Date; - }; + sdInstanceId?: string; + info?: SdGenerationInfo; + startDate?: Date; + endDate?: Date; } /** @@ -57,4 +44,18 @@ export interface SdGenerationInfo { is_using_inpainting_conditioning: boolean; } -export const generationStore = new IKV.Store(db, "job", { indices: {} }); +type GenerationIndices = { + fromId: number; + chatId: number; +}; + +export const generationStore = new IKV.Store( + db, + "generations", + { + indices: { + fromId: { getValue: (item) => item.from.id }, + chatId: { getValue: (item) => item.chat.id }, + }, + }, +); diff --git a/app/mod.ts b/app/mod.ts index 796de3d..05297b8 100644 --- a/app/mod.ts +++ b/app/mod.ts @@ -1,8 +1,10 @@ -import { handleGenerationUpdates, restartGenerationWorkers } from "./generationQueue.ts"; +import { processGenerationQueue, updateGenerationQueue } from "./generationQueue.ts"; +import { processUploadQueue } from "./uploadQueue.ts"; export async function runAllTasks() { await Promise.all([ - restartGenerationWorkers(), - handleGenerationUpdates(), + processGenerationQueue(), + updateGenerationQueue(), + processUploadQueue(), ]); } diff --git a/app/uploadQueue.ts b/app/uploadQueue.ts new file mode 100644 index 0000000..3344828 --- /dev/null +++ b/app/uploadQueue.ts @@ -0,0 +1,128 @@ +import { bot } from "../bot/mod.ts"; +import { formatUserChat } from "../utils/formatUserChat.ts"; +import { db, fs } from "./db.ts"; +import { generationStore, SdGenerationInfo } from "./generationStore.ts"; +import { FileType, FmtDuration, Grammy, GrammyParseMode, GrammyTypes, KVMQ, Log } from "../deps.ts"; + +const logger = () => Log.getLogger(); + +interface UploadJob { + from: GrammyTypes.User; + chat: GrammyTypes.Chat; + requestMessage: GrammyTypes.Message; + replyMessage: GrammyTypes.Message; + sdInstanceId: string; + startDate: Date; + endDate: Date; + imageKeys: Deno.KvKey[]; + info: SdGenerationInfo; +} + +export const uploadQueue = new KVMQ.Queue(db, "uploadQueue"); + +/** + * Initializes queue worker for uploading images to Telegram. + */ +export async function processUploadQueue() { + const uploadWorker = uploadQueue.createWorker(async ({ state }) => { + // change status message to uploading images + await bot.api.editMessageText( + state.replyMessage.chat.id, + state.replyMessage.message_id, + `Uploading your images...`, + { maxAttempts: 1 }, + ).catch(() => undefined); + + // render the caption + // const detailedReply = Object.keys(job.value.params).filter((key) => key !== "prompt").length > 0; + const detailedReply = true; + const jobDurationMs = Math.trunc((Date.now() - state.startDate.getTime()) / 1000) * 1000; + const { bold, fmt } = GrammyParseMode; + const caption = fmt([ + `${state.info.prompt}\n`, + ...detailedReply + ? [ + state.info.negative_prompt + ? fmt`${bold("Negative prompt:")} ${state.info.negative_prompt}\n` + : "", + fmt`${bold("Steps:")} ${state.info.steps}, `, + fmt`${bold("Sampler:")} ${state.info.sampler_name}, `, + fmt`${bold("CFG scale:")} ${state.info.cfg_scale}, `, + fmt`${bold("Seed:")} ${state.info.seed}, `, + fmt`${bold("Size")}: ${state.info.width}x${state.info.height}, `, + fmt`${bold("Worker")}: ${state.sdInstanceId}, `, + fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`, + ] + : [], + ]); + + // parse files from reply JSON + const inputFiles = await Promise.all( + state.imageKeys.map(async (fileKey, idx) => { + const imageBuffer = await fs.get(fileKey).then((entry) => entry.value); + if (!imageBuffer) throw new Error("File not found"); + const imageType = await FileType.fileTypeFromBuffer(imageBuffer); + if (!imageType) throw new Error("Image has unknown type"); + return Grammy.InputMediaBuilder.photo( + new Grammy.InputFile(imageBuffer, `image${idx}.${imageType.ext}`), + // if it can fit, add caption for first photo + idx === 0 && caption.text.length <= 1024 + ? { caption: caption.text, caption_entities: caption.entities } + : undefined, + ); + }), + ); + + // send the result to telegram + const resultMessages = await bot.api.sendMediaGroup(state.chat.id, inputFiles, { + reply_to_message_id: state.requestMessage.message_id, + allow_sending_without_reply: true, + maxAttempts: 5, + maxWait: 60, + }); + + // send caption in separate message if it couldn't fit + if (caption.text.length > 1024 && caption.text.length <= 4096) { + await bot.api.sendMessage(state.chat.id, caption.text, { + reply_to_message_id: resultMessages[0].message_id, + allow_sending_without_reply: true, + entities: caption.entities, + maxWait: 60, + }); + } + + // delete files from storage + await Promise.all(state.imageKeys.map((fileKey) => fs.delete(fileKey))); + + // save to generation storage + await generationStore.create({ + from: state.from, + chat: state.chat, + sdInstanceId: state.sdInstanceId, + startDate: state.startDate, + endDate: new Date(), + info: state.info, + }); + + // delete the status message + await bot.api.deleteMessage(state.replyMessage.chat.id, state.replyMessage.message_id) + .catch(() => undefined); + }, { concurrency: 3 }); + + uploadWorker.addEventListener("error", (e) => { + logger().error(`Upload failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`); + bot.api.sendMessage( + e.detail.job.state.requestMessage.chat.id, + `Upload failed: ${e.detail.error}\n\n` + + (e.detail.job.retryCount > 0 + ? `Will retry ${e.detail.job.retryCount} more times.` + : `Aborting.`), + { + reply_to_message_id: e.detail.job.state.requestMessage.message_id, + allow_sending_without_reply: true, + }, + ).catch(() => undefined); + }); + + await uploadWorker.processJobs(); +} diff --git a/bot/img2imgCommand.ts b/bot/img2imgCommand.ts index bea0b4b..c7d0a70 100644 --- a/bot/img2imgCommand.ts +++ b/bot/img2imgCommand.ts @@ -107,7 +107,7 @@ async function img2img( chat: ctx.message.chat, requestMessage: ctx.message, replyMessage: replyMessage, - }); + }, { retryCount: 3, repeatDelayMs: 10_000 }); - logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`); + logger().debug(`Generation (img2img) enqueued for ${formatUserChat(ctx.message)}`); } diff --git a/bot/mod.ts b/bot/mod.ts index eb02ea4..2c1c1d5 100644 --- a/bot/mod.ts +++ b/bot/mod.ts @@ -28,7 +28,7 @@ export type Context = type WithRetryApi = { [M in keyof T]: T[M] extends (args: infer P, ...rest: infer A) => infer R - ? (args: P extends object ? P & { maxAttempts?: number } : P, ...rest: A) => R + ? (args: P extends object ? P & { maxAttempts?: number; maxWait?: number } : P, ...rest: A) => R : T[M]; }; @@ -37,7 +37,7 @@ type Api = Grammy.Api>; export const bot = new Grammy.Bot( Deno.env.get("TG_BOT_TOKEN")!, { - client: { timeoutSeconds: 30 }, + client: { timeoutSeconds: 20 }, }, ); @@ -62,22 +62,20 @@ bot.api.config.use(GrammyFiles.hydrateFiles(bot.token)); // Automatically retry bot requests if we get a "too many requests" or telegram internal error bot.api.config.use(async (prev, method, payload, signal) => { const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3; + const maxWait = payload && ("maxWait" in payload) ? payload.maxWait ?? 10 : 10; let attempt = 0; while (true) { attempt++; const result = await prev(method, payload, signal); - if ( - result.ok || - ![429, 500].includes(result.error_code) || - attempt >= maxAttempts - ) { - return result; - } + if (result.ok) return result; + if (result.error_code !== 429) return result; + if (attempt >= maxAttempts) return result; + const retryAfter = result.parameters?.retry_after ?? (attempt * 5); + if (retryAfter > maxWait) return result; logger().warning( `${method} (attempt ${attempt}) failed: ${result.error_code} ${result.description}`, ); - const retryAfterMs = (result.parameters?.retry_after ?? (attempt * 5)) * 1000; - await new Promise((resolve) => setTimeout(resolve, retryAfterMs)); + await new Promise((resolve) => setTimeout(resolve, retryAfter * 1000)); } }); diff --git a/bot/queueCommand.ts b/bot/queueCommand.ts index c567efc..e296674 100644 --- a/bot/queueCommand.ts +++ b/bot/queueCommand.ts @@ -1,5 +1,5 @@ import { Grammy, GrammyParseMode } from "../deps.ts"; -import { Context, logger } from "./mod.ts"; +import { Context } from "./mod.ts"; import { getFlagEmoji } from "../utils/getFlagEmoji.ts"; import { activeGenerationWorkers, generationQueue } from "../app/generationQueue.ts"; import { getConfig } from "../app/config.ts"; @@ -7,7 +7,7 @@ import { getConfig } from "../app/config.ts"; export async function queueCommand(ctx: Grammy.CommandContext) { let formattedMessage = await getMessageText(); const queueMessage = await ctx.replyFmt(formattedMessage, { disable_notification: true }); - handleFutureUpdates().catch((err) => logger().warning(`Updating queue message failed: ${err}`)); + handleFutureUpdates().catch(() => undefined); async function getMessageText() { const config = await getConfig(); diff --git a/bot/txt2imgCommand.ts b/bot/txt2imgCommand.ts index 0564011..c3e48da 100644 --- a/bot/txt2imgCommand.ts +++ b/bot/txt2imgCommand.ts @@ -81,7 +81,7 @@ async function txt2img(ctx: Context, match: string, includeRepliedTo: boolean): chat: ctx.message.chat, requestMessage: ctx.message, replyMessage: replyMessage, - }); + }, { retryCount: 3, retryDelayMs: 10_000 }); - logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`); + logger().debug(`Generation (txt2img) enqueued for ${formatUserChat(ctx.message)}`); } diff --git a/deps.ts b/deps.ts index 5b1851d..c90b40f 100644 --- a/deps.ts +++ b/deps.ts @@ -5,13 +5,13 @@ export * as Collections from "https://deno.land/std@0.202.0/collections/mod.ts"; export * as Base64 from "https://deno.land/std@0.202.0/encoding/base64.ts"; export * as AsyncX from "https://deno.land/x/async@v2.0.2/mod.ts"; export * as ULID from "https://deno.land/x/ulid@v0.3.0/mod.ts"; -export * as IKV from "https://deno.land/x/indexed_kv@v0.3.0/mod.ts"; -export * as KVMQ from "https://deno.land/x/kvmq@v0.1.0/mod.ts"; +export * as IKV from "https://deno.land/x/indexed_kv@v0.4.0/mod.ts"; +export * as KVMQ from "https://deno.land/x/kvmq@v0.2.0/mod.ts"; +export * as KVFS from "https://deno.land/x/kvfs@v0.1.0/mod.ts"; export * as Grammy from "https://deno.land/x/grammy@v1.18.3/mod.ts"; export * as GrammyTypes from "https://deno.land/x/grammy_types@v3.2.2/mod.ts"; export * as GrammyAutoQuote from "https://deno.land/x/grammy_autoquote@v1.1.2/mod.ts"; export * as GrammyParseMode from "https://deno.land/x/grammy_parse_mode@1.8.1/mod.ts"; -export * as GrammyKvStorage from "https://deno.land/x/grammy_storages@v2.3.1/denokv/src/mod.ts"; export * as GrammyStatelessQ from "https://deno.land/x/grammy_stateless_question_alpha@v3.0.4/mod.ts"; export * as GrammyFiles from "https://deno.land/x/grammy_files@v1.0.4/mod.ts"; export * as FileType from "https://esm.sh/file-type@18.5.0"; diff --git a/main.ts b/main.ts index 86f1ed1..4866c58 100644 --- a/main.ts +++ b/main.ts @@ -1,8 +1,8 @@ -// Load environment variables from .env file import "https://deno.land/std@0.201.0/dotenv/load.ts"; - -// Setup logging import { Log } from "./deps.ts"; +import { bot } from "./bot/mod.ts"; +import { runAllTasks } from "./app/mod.ts"; + Log.setup({ handlers: { console: new Log.handlers.ConsoleHandler("DEBUG"), @@ -12,9 +12,6 @@ Log.setup({ }, }); -// Main program logic -import { bot } from "./bot/mod.ts"; -import { runAllTasks } from "./app/mod.ts"; await Promise.all([ bot.start(), runAllTasks(), diff --git a/utils/formatUserChat.ts b/utils/formatUserChat.ts index c38f09a..65b45cc 100644 --- a/utils/formatUserChat.ts +++ b/utils/formatUserChat.ts @@ -1,6 +1,8 @@ import { GrammyTypes } from "../deps.ts"; -export function formatUserChat(ctx: { from?: GrammyTypes.User; chat?: GrammyTypes.Chat }) { +export function formatUserChat( + ctx: { from?: GrammyTypes.User; chat?: GrammyTypes.Chat; sdInstanceId?: string }, +) { const msg: string[] = []; if (ctx.from) { msg.push(ctx.from.first_name); @@ -24,5 +26,8 @@ export function formatUserChat(ctx: { from?: GrammyTypes.User; chat?: GrammyType } } } + if (ctx.sdInstanceId) { + msg.push(`using ${ctx.sdInstanceId}`); + } return msg.join(" "); }