import { fileTypeFromBuffer } from "file_type"; import { InputFile, InputMediaBuilder } from "grammy"; import { bold, fmt } from "grammy_parse_mode"; import { Chat, Message, User } from "grammy_types"; import { Queue } from "kvmq"; import { format } from "std/fmt/duration.ts"; import { getLogger } from "std/log/mod.ts"; import { bot } from "../bot/mod.ts"; import { formatUserChat } from "../utils/formatUserChat.ts"; import { db, fs } from "./db.ts"; import { generationStore, SdGenerationInfo } from "./generationStore.ts"; import { globalStats } from "./globalStats.ts"; const logger = () => getLogger(); interface UploadJob { from: User; chat: Chat; requestMessage: Message; replyMessage: Message; workerInstanceKey?: string; startDate: Date; endDate: Date; imageKeys: Deno.KvKey[]; info: SdGenerationInfo; } export const uploadQueue = new Queue(db, "uploadQueue"); /** * Initializes queue worker for uploading images to Telegram. */ export async function processUploadQueue() { const uploadWorker = uploadQueue.createWorker(async ({ state }) => { // change status message to uploading images await bot.api.editMessageText(, state.replyMessage.message_id, `Uploading your images...`, { maxAttempts: 1 }, ).catch(() => undefined); // render the caption // const detailedReply = Object.keys(job.value.params).filter((key) => key !== "prompt").length > 0; const detailedReply = true; const jobDurationMs = Math.trunc(( - state.startDate.getTime()) / 1000) * 1000; const caption = fmt([ `${}\n`, ...detailedReply ? [ ? fmt`${bold("Negative prompt:")} ${}\n` : "", fmt`${bold("Steps:")} ${}, `, fmt`${bold("Sampler:")} ${}, `, fmt`${bold("CFG scale:")} ${}, `, fmt`${bold("Seed:")} ${}, `, fmt`${bold("Size")}: ${}x${}, `, state.workerInstanceKey ? fmt`${bold("Worker")}: ${state.workerInstanceKey}, ` : "", fmt`${bold("Time taken")}: ${format(jobDurationMs, { ignoreZero: true })}`, ] : [], ]); // parse files from reply JSON let size = 0; const types = new Set(); const inputFiles = await Promise.all( (fileKey, idx) => { const imageBuffer = await fs.get(fileKey).then((entry) => entry.value); if (!imageBuffer) throw new Error("File not found"); const imageType = await fileTypeFromBuffer(imageBuffer); if (!imageType) throw new Error("Image has unknown type"); size += imageBuffer.byteLength; types.add(imageType.ext); return new InputFile(imageBuffer, `image${idx}.${imageType.ext}`), // if it can fit, add caption for first photo idx === 0 && caption.text.length <= 1024 ? { caption: caption.text, caption_entities: caption.entities } : undefined, ); }), ); // send the result to telegram const resultMessages = await bot.api.sendMediaGroup(, inputFiles, { reply_to_message_id: state.requestMessage.message_id, allow_sending_without_reply: true, maxAttempts: 5, maxWait: 60, }); // send caption in separate message if it couldn't fit if (caption.text.length > 1024 && caption.text.length <= 4096) { await bot.api.sendMessage(, caption.text, { reply_to_message_id: resultMessages[0].message_id, allow_sending_without_reply: true, entities: caption.entities, maxWait: 60, }); } // delete files from storage await Promise.all( => fs.delete(fileKey))); // save to generation storage await generationStore.create({ from: state.from, chat:, sdInstanceId: state.workerInstanceKey, startDate: state.startDate, endDate: new Date(), info:, }); // update live stats { globalStats.imageCount++; globalStats.stepCount +=; globalStats.pixelCount += *; globalStats.pixelStepCount += * *; const userIdSet = new Set(globalStats.userIds); userIdSet.add(; globalStats.userIds = [...userIdSet]; } logger().debug( `Uploaded ${state.imageKeys.length} ${[...types].join(",")} images (${ Math.trunc(size / 1024) }kB) for ${formatUserChat(state)}`, ); // delete the status message await bot.api.deleteMessage(, state.replyMessage.message_id) .catch(() => undefined); }, { concurrency: 3 }); uploadWorker.addEventListener("error", (e) => { logger().error(`Upload failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`); bot.api.sendMessage(, `Upload failed: ${e.detail.error}\n\n` + (e.detail.job.retryCount > 0 ? `Will retry ${e.detail.job.retryCount} more times.` : `Aborting.`), { reply_to_message_id: e.detail.job.state.requestMessage.message_id, allow_sending_without_reply: true, }, ).catch(() => undefined); }); await uploadWorker.processJobs(); }