forked from pinks/eris
1
0
Fork 0

perf: split generation and upload queues

This commit is contained in:
pinks 2023-09-24 14:05:28 +02:00
parent 4e10168786
commit aa380c63fb
12 changed files with 291 additions and 259 deletions

View File

@ -1 +1,4 @@
import { KVFS } from "../deps.ts";
export const db = await Deno.openKv("./app.db"); export const db = await Deno.openKv("./app.db");
export const fs = new KVFS.KvFs(db);

View File

@ -3,23 +3,21 @@ import { PngInfo } from "../sd/parsePngInfo.ts";
import * as SdApi from "../sd/sdApi.ts"; import * as SdApi from "../sd/sdApi.ts";
import { formatUserChat } from "../utils/formatUserChat.ts"; import { formatUserChat } from "../utils/formatUserChat.ts";
import { getConfig, SdInstanceData } from "./config.ts"; import { getConfig, SdInstanceData } from "./config.ts";
import { db } from "./db.ts"; import { db, fs } from "./db.ts";
import { generationStore, SdGenerationInfo } from "./generationStore.ts"; import { SdGenerationInfo } from "./generationStore.ts";
import { import {
Async, Async,
AsyncX, AsyncX,
Base64, Base64,
createOpenApiClient, createOpenApiClient,
FileType,
FmtDuration,
Grammy,
GrammyParseMode,
GrammyTypes, GrammyTypes,
KVMQ, KVMQ,
Log, Log,
ULID,
} from "../deps.ts"; } from "../deps.ts";
import { formatOrdinal } from "../utils/formatOrdinal.ts"; import { formatOrdinal } from "../utils/formatOrdinal.ts";
import { SdError } from "../sd/SdError.ts"; import { SdError } from "../sd/SdError.ts";
import { uploadQueue } from "./uploadQueue.ts";
const logger = () => Log.getLogger(); const logger = () => Log.getLogger();
@ -37,7 +35,7 @@ interface GenerationJob {
from: GrammyTypes.User; from: GrammyTypes.User;
chat: GrammyTypes.Chat; chat: GrammyTypes.Chat;
requestMessage: GrammyTypes.Message; requestMessage: GrammyTypes.Message;
replyMessage?: GrammyTypes.Message; replyMessage: GrammyTypes.Message;
sdInstanceId?: string; sdInstanceId?: string;
progress?: number; progress?: number;
} }
@ -47,9 +45,9 @@ export const generationQueue = new KVMQ.Queue<GenerationJob>(db, "jobQueue");
export const activeGenerationWorkers = new Map<string, KVMQ.Worker<GenerationJob>>(); export const activeGenerationWorkers = new Map<string, KVMQ.Worker<GenerationJob>>();
/** /**
* Periodically restarts stable diffusion generation workers if they become online. * Initializes queue workers for each SD instance when they become online.
*/ */
export async function restartGenerationWorkers() { export async function processGenerationQueue() {
while (true) { while (true) {
const config = await getConfig(); const config = await getConfig();
@ -57,14 +55,13 @@ export async function restartGenerationWorkers() {
const activeWorker = activeGenerationWorkers.get(sdInstance.id); const activeWorker = activeGenerationWorkers.get(sdInstance.id);
if (activeWorker?.isProcessing) continue; if (activeWorker?.isProcessing) continue;
const activeWorkerSdClient = createOpenApiClient<SdApi.paths>({ const workerSdClient = createOpenApiClient<SdApi.paths>({
baseUrl: sdInstance.api.url, baseUrl: sdInstance.api.url,
headers: { "Authorization": sdInstance.api.auth }, headers: { "Authorization": sdInstance.api.auth },
}); });
// check if worker is up // check if worker is up
const activeWorkerStatus = await workerSdClient.GET("/sdapi/v1/memory", {
const activeWorkerStatus = await activeWorkerSdClient.GET("/sdapi/v1/memory", {
signal: AbortSignal.timeout(10_000), signal: AbortSignal.timeout(10_000),
}) })
.then((response) => { .then((response) => {
@ -76,102 +73,79 @@ export async function restartGenerationWorkers() {
.catch((error) => { .catch((error) => {
logger().warning(`Worker ${sdInstance.id} is down: ${error}`); logger().warning(`Worker ${sdInstance.id} is down: ${error}`);
}); });
if (!activeWorkerStatus?.data) { if (!activeWorkerStatus?.data) {
continue; continue;
} }
const newWorker = generationQueue.createWorker(({ state, setState }) => // create worker
processGenerationJob(state, setState, sdInstance) const newWorker = generationQueue.createWorker(async ({ state }, updateJob) => {
); await processGenerationJob(state, updateJob, sdInstance);
});
logger().info(`Started worker ${sdInstance.id}`);
newWorker.processJobs();
newWorker.addEventListener("error", (e) => { newWorker.addEventListener("error", (e) => {
logger().error(`Job failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`); logger().error(
`Generation failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`,
);
bot.api.sendMessage( bot.api.sendMessage(
e.detail.job.state.requestMessage.chat.id, e.detail.job.state.requestMessage.chat.id,
`Generating failed: ${e.detail.error}`, `Generation failed: ${e.detail.error}\n\n` +
(e.detail.job.retryCount > 0
? `Will retry ${e.detail.job.retryCount} more times.`
: `Aborting.`),
{ {
reply_to_message_id: e.detail.job.state.requestMessage.message_id, reply_to_message_id: e.detail.job.state.requestMessage.message_id,
allow_sending_without_reply: true,
}, },
).catch(() => undefined); ).catch(() => undefined);
// TODO: only stop worker if error is network error if (e.detail.error instanceof SdError) {
newWorker.stopProcessing(); newWorker.stopProcessing();
}
}); });
newWorker.processJobs();
activeGenerationWorkers.set(sdInstance.id, newWorker); activeGenerationWorkers.set(sdInstance.id, newWorker);
logger().info(`Started worker ${sdInstance.id}`);
} }
await Async.delay(60_000); await Async.delay(60_000);
} }
} }
/**
* Processes a single job from the queue.
*/
async function processGenerationJob( async function processGenerationJob(
job: GenerationJob, state: GenerationJob,
setJob: (state: GenerationJob) => Promise<void>, updateJob: (job: Partial<KVMQ.JobData<GenerationJob>>) => Promise<void>,
sdInstance: SdInstanceData, sdInstance: SdInstanceData,
) { ) {
logger().debug(`Job started for ${formatUserChat(job)} using ${sdInstance.id}`);
const startDate = new Date(); const startDate = new Date();
job.sdInstanceId = sdInstance.id;
await setJob(job);
const config = await getConfig();
const workerSdClient = createOpenApiClient<SdApi.paths>({ const workerSdClient = createOpenApiClient<SdApi.paths>({
baseUrl: sdInstance.api.url, baseUrl: sdInstance.api.url,
headers: { "Authorization": sdInstance.api.auth }, headers: { "Authorization": sdInstance.api.auth },
}); });
state.sdInstanceId = sdInstance.id;
logger().debug(`Generation started for ${formatUserChat(state)}`);
await updateJob({ state: state });
// if there is already a status message and its older than 30 seconds // check if bot can post messages in this chat
if (job.replyMessage && (Date.now() - job.replyMessage.date * 1000) > 30_000) { const chat = await bot.api.getChat(state.chat.id);
// try to delete it if (
await bot.api.deleteMessage(job.replyMessage.chat.id, job.replyMessage.message_id) (chat.type === "group" || chat.type === "supergroup") &&
.catch(() => undefined); (!chat.permissions?.can_send_messages || !chat.permissions?.can_send_photos)
job.replyMessage = undefined; ) {
await setJob(job); throw new Error("Bot doesn't have permissions to send photos in this chat");
} }
await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 }) // edit the existing status message
.catch(() => undefined); await bot.api.editMessageText(
state.replyMessage.chat.id,
// if now there is no status message state.replyMessage.message_id,
if (!job.replyMessage) { `Generating your prompt now... 0% using ${sdInstance.name}`,
// send a new status message { maxAttempts: 1 },
job.replyMessage = await bot.api.sendMessage( );
job.chat.id,
`Generating your prompt now... 0% using ${sdInstance.name}`,
{ reply_to_message_id: job.requestMessage.message_id },
).catch((err) => {
// if the request message (the message we are replying to) was deleted
if (err instanceof Grammy.GrammyError && err.message.match(/repl(y|ied)/)) {
// set the status message to undefined
return undefined;
}
throw err;
});
await setJob(job);
} else {
// edit the existing status message
await bot.api.editMessageText(
job.replyMessage.chat.id,
job.replyMessage.message_id,
`Generating your prompt now... 0% using ${sdInstance.name}`,
{ maxAttempts: 1 },
).catch(() => undefined);
}
// if we don't have a status message (it failed sending because request was deleted)
if (!job.replyMessage) {
// cancel the job
logger().info(`Job cancelled for ${formatUserChat(job)}`);
return;
}
// reduce size if worker can't handle the resolution // reduce size if worker can't handle the resolution
const config = await getConfig();
const size = limitSize( const size = limitSize(
{ ...config.defaultParams, ...job.task.params }, { ...config.defaultParams, ...state.task.params },
sdInstance.maxResolution, sdInstance.maxResolution,
); );
function limitSize( function limitSize(
@ -190,31 +164,31 @@ async function processGenerationJob(
} }
// start generating the image // start generating the image
const responsePromise = job.task.type === "txt2img" const responsePromise = state.task.type === "txt2img"
? workerSdClient.POST("/sdapi/v1/txt2img", { ? workerSdClient.POST("/sdapi/v1/txt2img", {
body: { body: {
...config.defaultParams, ...config.defaultParams,
...job.task.params, ...state.task.params,
...size, ...size,
negative_prompt: job.task.params.negative_prompt negative_prompt: state.task.params.negative_prompt
? job.task.params.negative_prompt ? state.task.params.negative_prompt
: config.defaultParams?.negative_prompt, : config.defaultParams?.negative_prompt,
}, },
}) })
: job.task.type === "img2img" : state.task.type === "img2img"
? workerSdClient.POST("/sdapi/v1/img2img", { ? workerSdClient.POST("/sdapi/v1/img2img", {
body: { body: {
...config.defaultParams, ...config.defaultParams,
...job.task.params, ...state.task.params,
...size, ...size,
negative_prompt: job.task.params.negative_prompt negative_prompt: state.task.params.negative_prompt
? job.task.params.negative_prompt ? state.task.params.negative_prompt
: config.defaultParams?.negative_prompt, : config.defaultParams?.negative_prompt,
init_images: [ init_images: [
Base64.encode( Base64.encode(
await fetch( await fetch(
`https://api.telegram.org/file/bot${bot.token}/${await bot.api.getFile( `https://api.telegram.org/file/bot${bot.token}/${await bot.api.getFile(
job.task.fileId, state.task.fileId,
).then((file) => file.file_path)}`, ).then((file) => file.file_path)}`,
).then((resp) => resp.arrayBuffer()), ).then((resp) => resp.arrayBuffer()),
), ),
@ -224,44 +198,45 @@ async function processGenerationJob(
: undefined; : undefined;
if (!responsePromise) { if (!responsePromise) {
throw new Error(`Unknown task type: ${job.task.type}`); throw new Error(`Unknown task type: ${state.task.type}`);
} }
// poll for progress while the generation request is pending // poll for progress while the generation request is pending
while (await AsyncX.promiseState(responsePromise) === "pending") { do {
await Async.delay(3000);
const progressResponse = await workerSdClient.GET("/sdapi/v1/progress", { const progressResponse = await workerSdClient.GET("/sdapi/v1/progress", {
params: {}, params: {},
signal: AbortSignal.timeout(15_000), signal: AbortSignal.timeout(15000),
}); });
if (!progressResponse.data) { if (!progressResponse.data) {
throw new SdError( throw new SdError(
"Failed to get progress", "Progress request failed",
progressResponse.response, progressResponse.response,
progressResponse.error, progressResponse.error,
); );
} }
job.progress = progressResponse.data.progress;
await setJob(job); state.progress = progressResponse.data.progress;
await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 }) await updateJob({ state: state });
await bot.api.sendChatAction(state.chat.id, "upload_photo", { maxAttempts: 1 })
.catch(() => undefined); .catch(() => undefined);
if (job.replyMessage) { await bot.api.editMessageText(
await bot.api.editMessageText( state.replyMessage.chat.id,
job.replyMessage.chat.id, state.replyMessage.message_id,
job.replyMessage.message_id, `Generating your prompt now... ${
`Generating your prompt now... ${ (progressResponse.data.progress * 100).toFixed(0)
(progressResponse.data.progress * 100).toFixed(0) }% using ${sdInstance.name}`,
}% using ${sdInstance.name}`, { maxAttempts: 1 },
{ maxAttempts: 1 }, ).catch(() => undefined);
).catch(() => undefined);
} await Promise.race([Async.delay(3000), responsePromise]).catch(() => undefined);
} } while (await AsyncX.promiseState(responsePromise) === "pending");
// check response
const response = await responsePromise; const response = await responsePromise;
if (!response.data) { if (!response.data) {
throw new SdError("Generating image failed", response.response, response.error); throw new SdError(`${state.task.type} failed`, response.response, response.error);
} }
if (!response.data.images?.length) { if (!response.data.images?.length) {
throw new Error("No images returned from SD"); throw new Error("No images returned from SD");
} }
@ -269,120 +244,43 @@ async function processGenerationJob(
// info field is a json serialized string so we need to parse it // info field is a json serialized string so we need to parse it
const info: SdGenerationInfo = JSON.parse(response.data.info); const info: SdGenerationInfo = JSON.parse(response.data.info);
// save images to db
const imageKeys: Deno.KvKey[] = [];
for (const imageBase64 of response.data.images) {
const imageBuffer = Base64.decode(imageBase64);
const imageKey = ["images", "upload", ULID.ulid()];
await fs.set(imageKey, imageBuffer, { expireIn: 30 * 60 * 1000 });
imageKeys.push(imageKey);
}
// create a new upload job
await uploadQueue.pushJob({
chat: state.chat,
from: state.from,
requestMessage: state.requestMessage,
replyMessage: state.replyMessage,
sdInstanceId: sdInstance.id,
startDate,
endDate: new Date(),
imageKeys,
info,
}, { retryCount: 5, retryDelayMs: 10000 });
// change status message to uploading images // change status message to uploading images
await bot.api.editMessageText( await bot.api.editMessageText(
job.replyMessage.chat.id, state.replyMessage.chat.id,
job.replyMessage.message_id, state.replyMessage.message_id,
`Uploading your images...`, `Uploading your images...`,
{ maxAttempts: 1 }, { maxAttempts: 1 },
).catch(() => undefined); ).catch(() => undefined);
// render the caption logger().debug(`Generation finished for ${formatUserChat(state)}`);
// const detailedReply = Object.keys(job.value.params).filter((key) => key !== "prompt").length > 0;
const detailedReply = true;
const jobDurationMs = Math.trunc((Date.now() - startDate.getTime()) / 1000) * 1000;
const { bold, fmt } = GrammyParseMode;
const caption = fmt([
`${info.prompt}\n`,
...detailedReply
? [
info.negative_prompt ? fmt`${bold("Negative prompt:")} ${info.negative_prompt}\n` : "",
fmt`${bold("Steps:")} ${info.steps}, `,
fmt`${bold("Sampler:")} ${info.sampler_name}, `,
fmt`${bold("CFG scale:")} ${info.cfg_scale}, `,
fmt`${bold("Seed:")} ${info.seed}, `,
fmt`${bold("Size")}: ${info.width}x${info.height}, `,
fmt`${bold("Worker")}: ${sdInstance.id}, `,
fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`,
]
: [],
]);
// sending images loop because telegram is unreliable and it would be a shame to lose the images
// TODO: separate queue for sending images
let sendMediaAttempt = 0;
let resultMessages: GrammyTypes.Message.MediaMessage[] | undefined;
while (true) {
sendMediaAttempt++;
await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 })
.catch(() => undefined);
// parse files from reply JSON
const inputFiles = await Promise.all(
response.data.images.map(async (imageBase64, idx) => {
const imageBuffer = Base64.decode(imageBase64);
const imageType = await FileType.fileTypeFromBuffer(imageBuffer);
if (!imageType) throw new Error("Unknown file type returned from worker");
return Grammy.InputMediaBuilder.photo(
new Grammy.InputFile(imageBuffer, `image${idx}.${imageType.ext}`),
// if it can fit, add caption for first photo
idx === 0 && caption.text.length <= 1024
? { caption: caption.text, caption_entities: caption.entities }
: undefined,
);
}),
);
// send the result to telegram
try {
resultMessages = await bot.api.sendMediaGroup(job.chat.id, inputFiles, {
reply_to_message_id: job.requestMessage.message_id,
maxAttempts: 5,
});
break;
} catch (err) {
logger().warning(
`Sending images (attempt ${sendMediaAttempt}) for ${
formatUserChat(job)
} using ${sdInstance.id} failed: ${err}`,
);
if (sendMediaAttempt >= 6) throw err;
// wait 2 * 5 seconds before retrying
for (let i = 0; i < 2; i++) {
await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 })
.catch(() => undefined);
await Async.delay(5000);
}
}
}
// send caption in separate message if it couldn't fit
if (caption.text.length > 1024 && caption.text.length <= 4096) {
await bot.api.sendMessage(job.chat.id, caption.text, {
reply_to_message_id: resultMessages[0].message_id,
entities: caption.entities,
});
}
// delete the status message
await bot.api.deleteMessage(job.replyMessage.chat.id, job.replyMessage.message_id)
.catch(() => undefined);
job.replyMessage = undefined;
await setJob(job);
// save to generation storage
generationStore.create({
task: { type: job.task.type, params: job.task.params },
from: job.from,
chat: job.chat,
status: {
startDate,
endDate: new Date(),
info: info,
},
});
logger().debug(
`Job finished for ${formatUserChat(job)} using ${sdInstance.id}${
sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : ""
}`,
);
} }
/** /**
* Updates the status message of all jobs in the queue. * Handles queue updates and updates the status message.
*/ */
export async function handleGenerationUpdates() { export async function updateGenerationQueue() {
while (true) { while (true) {
const jobs = await generationQueue.getAllJobs(); const jobs = await generationQueue.getAllJobs();
let index = 0; let index = 0;

View File

@ -1,26 +1,13 @@
import { GrammyTypes, IKV } from "../deps.ts"; import { GrammyTypes, IKV } from "../deps.ts";
import { PngInfo } from "../sd/parsePngInfo.ts";
import { db } from "./db.ts"; import { db } from "./db.ts";
export interface GenerationSchema { export interface GenerationSchema {
task:
| {
type: "txt2img";
params: Partial<PngInfo>;
}
| {
type: "img2img";
params: Partial<PngInfo>;
fileId?: string;
};
from: GrammyTypes.User; from: GrammyTypes.User;
chat: GrammyTypes.Chat; chat: GrammyTypes.Chat;
requestMessageId?: number; sdInstanceId?: string;
status: { info?: SdGenerationInfo;
info?: SdGenerationInfo; startDate?: Date;
startDate?: Date; endDate?: Date;
endDate?: Date;
};
} }
/** /**
@ -57,4 +44,18 @@ export interface SdGenerationInfo {
is_using_inpainting_conditioning: boolean; is_using_inpainting_conditioning: boolean;
} }
export const generationStore = new IKV.Store<GenerationSchema, {}>(db, "job", { indices: {} }); type GenerationIndices = {
fromId: number;
chatId: number;
};
export const generationStore = new IKV.Store<GenerationSchema, GenerationIndices>(
db,
"generations",
{
indices: {
fromId: { getValue: (item) => item.from.id },
chatId: { getValue: (item) => item.chat.id },
},
},
);

View File

@ -1,8 +1,10 @@
import { handleGenerationUpdates, restartGenerationWorkers } from "./generationQueue.ts"; import { processGenerationQueue, updateGenerationQueue } from "./generationQueue.ts";
import { processUploadQueue } from "./uploadQueue.ts";
export async function runAllTasks() { export async function runAllTasks() {
await Promise.all([ await Promise.all([
restartGenerationWorkers(), processGenerationQueue(),
handleGenerationUpdates(), updateGenerationQueue(),
processUploadQueue(),
]); ]);
} }

128
app/uploadQueue.ts Normal file
View File

@ -0,0 +1,128 @@
import { bot } from "../bot/mod.ts";
import { formatUserChat } from "../utils/formatUserChat.ts";
import { db, fs } from "./db.ts";
import { generationStore, SdGenerationInfo } from "./generationStore.ts";
import { FileType, FmtDuration, Grammy, GrammyParseMode, GrammyTypes, KVMQ, Log } from "../deps.ts";
const logger = () => Log.getLogger();
interface UploadJob {
from: GrammyTypes.User;
chat: GrammyTypes.Chat;
requestMessage: GrammyTypes.Message;
replyMessage: GrammyTypes.Message;
sdInstanceId: string;
startDate: Date;
endDate: Date;
imageKeys: Deno.KvKey[];
info: SdGenerationInfo;
}
export const uploadQueue = new KVMQ.Queue<UploadJob>(db, "uploadQueue");
/**
* Initializes queue worker for uploading images to Telegram.
*/
export async function processUploadQueue() {
const uploadWorker = uploadQueue.createWorker(async ({ state }) => {
// change status message to uploading images
await bot.api.editMessageText(
state.replyMessage.chat.id,
state.replyMessage.message_id,
`Uploading your images...`,
{ maxAttempts: 1 },
).catch(() => undefined);
// render the caption
// const detailedReply = Object.keys(job.value.params).filter((key) => key !== "prompt").length > 0;
const detailedReply = true;
const jobDurationMs = Math.trunc((Date.now() - state.startDate.getTime()) / 1000) * 1000;
const { bold, fmt } = GrammyParseMode;
const caption = fmt([
`${state.info.prompt}\n`,
...detailedReply
? [
state.info.negative_prompt
? fmt`${bold("Negative prompt:")} ${state.info.negative_prompt}\n`
: "",
fmt`${bold("Steps:")} ${state.info.steps}, `,
fmt`${bold("Sampler:")} ${state.info.sampler_name}, `,
fmt`${bold("CFG scale:")} ${state.info.cfg_scale}, `,
fmt`${bold("Seed:")} ${state.info.seed}, `,
fmt`${bold("Size")}: ${state.info.width}x${state.info.height}, `,
fmt`${bold("Worker")}: ${state.sdInstanceId}, `,
fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`,
]
: [],
]);
// parse files from reply JSON
const inputFiles = await Promise.all(
state.imageKeys.map(async (fileKey, idx) => {
const imageBuffer = await fs.get(fileKey).then((entry) => entry.value);
if (!imageBuffer) throw new Error("File not found");
const imageType = await FileType.fileTypeFromBuffer(imageBuffer);
if (!imageType) throw new Error("Image has unknown type");
return Grammy.InputMediaBuilder.photo(
new Grammy.InputFile(imageBuffer, `image${idx}.${imageType.ext}`),
// if it can fit, add caption for first photo
idx === 0 && caption.text.length <= 1024
? { caption: caption.text, caption_entities: caption.entities }
: undefined,
);
}),
);
// send the result to telegram
const resultMessages = await bot.api.sendMediaGroup(state.chat.id, inputFiles, {
reply_to_message_id: state.requestMessage.message_id,
allow_sending_without_reply: true,
maxAttempts: 5,
maxWait: 60,
});
// send caption in separate message if it couldn't fit
if (caption.text.length > 1024 && caption.text.length <= 4096) {
await bot.api.sendMessage(state.chat.id, caption.text, {
reply_to_message_id: resultMessages[0].message_id,
allow_sending_without_reply: true,
entities: caption.entities,
maxWait: 60,
});
}
// delete files from storage
await Promise.all(state.imageKeys.map((fileKey) => fs.delete(fileKey)));
// save to generation storage
await generationStore.create({
from: state.from,
chat: state.chat,
sdInstanceId: state.sdInstanceId,
startDate: state.startDate,
endDate: new Date(),
info: state.info,
});
// delete the status message
await bot.api.deleteMessage(state.replyMessage.chat.id, state.replyMessage.message_id)
.catch(() => undefined);
}, { concurrency: 3 });
uploadWorker.addEventListener("error", (e) => {
logger().error(`Upload failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`);
bot.api.sendMessage(
e.detail.job.state.requestMessage.chat.id,
`Upload failed: ${e.detail.error}\n\n` +
(e.detail.job.retryCount > 0
? `Will retry ${e.detail.job.retryCount} more times.`
: `Aborting.`),
{
reply_to_message_id: e.detail.job.state.requestMessage.message_id,
allow_sending_without_reply: true,
},
).catch(() => undefined);
});
await uploadWorker.processJobs();
}

View File

@ -107,7 +107,7 @@ async function img2img(
chat: ctx.message.chat, chat: ctx.message.chat,
requestMessage: ctx.message, requestMessage: ctx.message,
replyMessage: replyMessage, replyMessage: replyMessage,
}); }, { retryCount: 3, repeatDelayMs: 10_000 });
logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`); logger().debug(`Generation (img2img) enqueued for ${formatUserChat(ctx.message)}`);
} }

View File

@ -28,7 +28,7 @@ export type Context =
type WithRetryApi<T extends Grammy.RawApi> = { type WithRetryApi<T extends Grammy.RawApi> = {
[M in keyof T]: T[M] extends (args: infer P, ...rest: infer A) => infer R [M in keyof T]: T[M] extends (args: infer P, ...rest: infer A) => infer R
? (args: P extends object ? P & { maxAttempts?: number } : P, ...rest: A) => R ? (args: P extends object ? P & { maxAttempts?: number; maxWait?: number } : P, ...rest: A) => R
: T[M]; : T[M];
}; };
@ -37,7 +37,7 @@ type Api = Grammy.Api<WithRetryApi<Grammy.RawApi>>;
export const bot = new Grammy.Bot<Context, Api>( export const bot = new Grammy.Bot<Context, Api>(
Deno.env.get("TG_BOT_TOKEN")!, Deno.env.get("TG_BOT_TOKEN")!,
{ {
client: { timeoutSeconds: 30 }, client: { timeoutSeconds: 20 },
}, },
); );
@ -62,22 +62,20 @@ bot.api.config.use(GrammyFiles.hydrateFiles(bot.token));
// Automatically retry bot requests if we get a "too many requests" or telegram internal error // Automatically retry bot requests if we get a "too many requests" or telegram internal error
bot.api.config.use(async (prev, method, payload, signal) => { bot.api.config.use(async (prev, method, payload, signal) => {
const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3; const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3;
const maxWait = payload && ("maxWait" in payload) ? payload.maxWait ?? 10 : 10;
let attempt = 0; let attempt = 0;
while (true) { while (true) {
attempt++; attempt++;
const result = await prev(method, payload, signal); const result = await prev(method, payload, signal);
if ( if (result.ok) return result;
result.ok || if (result.error_code !== 429) return result;
![429, 500].includes(result.error_code) || if (attempt >= maxAttempts) return result;
attempt >= maxAttempts const retryAfter = result.parameters?.retry_after ?? (attempt * 5);
) { if (retryAfter > maxWait) return result;
return result;
}
logger().warning( logger().warning(
`${method} (attempt ${attempt}) failed: ${result.error_code} ${result.description}`, `${method} (attempt ${attempt}) failed: ${result.error_code} ${result.description}`,
); );
const retryAfterMs = (result.parameters?.retry_after ?? (attempt * 5)) * 1000; await new Promise((resolve) => setTimeout(resolve, retryAfter * 1000));
await new Promise((resolve) => setTimeout(resolve, retryAfterMs));
} }
}); });

View File

@ -1,5 +1,5 @@
import { Grammy, GrammyParseMode } from "../deps.ts"; import { Grammy, GrammyParseMode } from "../deps.ts";
import { Context, logger } from "./mod.ts"; import { Context } from "./mod.ts";
import { getFlagEmoji } from "../utils/getFlagEmoji.ts"; import { getFlagEmoji } from "../utils/getFlagEmoji.ts";
import { activeGenerationWorkers, generationQueue } from "../app/generationQueue.ts"; import { activeGenerationWorkers, generationQueue } from "../app/generationQueue.ts";
import { getConfig } from "../app/config.ts"; import { getConfig } from "../app/config.ts";
@ -7,7 +7,7 @@ import { getConfig } from "../app/config.ts";
export async function queueCommand(ctx: Grammy.CommandContext<Context>) { export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
let formattedMessage = await getMessageText(); let formattedMessage = await getMessageText();
const queueMessage = await ctx.replyFmt(formattedMessage, { disable_notification: true }); const queueMessage = await ctx.replyFmt(formattedMessage, { disable_notification: true });
handleFutureUpdates().catch((err) => logger().warning(`Updating queue message failed: ${err}`)); handleFutureUpdates().catch(() => undefined);
async function getMessageText() { async function getMessageText() {
const config = await getConfig(); const config = await getConfig();

View File

@ -81,7 +81,7 @@ async function txt2img(ctx: Context, match: string, includeRepliedTo: boolean):
chat: ctx.message.chat, chat: ctx.message.chat,
requestMessage: ctx.message, requestMessage: ctx.message,
replyMessage: replyMessage, replyMessage: replyMessage,
}); }, { retryCount: 3, retryDelayMs: 10_000 });
logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`); logger().debug(`Generation (txt2img) enqueued for ${formatUserChat(ctx.message)}`);
} }

View File

@ -5,13 +5,13 @@ export * as Collections from "https://deno.land/std@0.202.0/collections/mod.ts";
export * as Base64 from "https://deno.land/std@0.202.0/encoding/base64.ts"; export * as Base64 from "https://deno.land/std@0.202.0/encoding/base64.ts";
export * as AsyncX from "https://deno.land/x/async@v2.0.2/mod.ts"; export * as AsyncX from "https://deno.land/x/async@v2.0.2/mod.ts";
export * as ULID from "https://deno.land/x/ulid@v0.3.0/mod.ts"; export * as ULID from "https://deno.land/x/ulid@v0.3.0/mod.ts";
export * as IKV from "https://deno.land/x/indexed_kv@v0.3.0/mod.ts"; export * as IKV from "https://deno.land/x/indexed_kv@v0.4.0/mod.ts";
export * as KVMQ from "https://deno.land/x/kvmq@v0.1.0/mod.ts"; export * as KVMQ from "https://deno.land/x/kvmq@v0.2.0/mod.ts";
export * as KVFS from "https://deno.land/x/kvfs@v0.1.0/mod.ts";
export * as Grammy from "https://deno.land/x/grammy@v1.18.3/mod.ts"; export * as Grammy from "https://deno.land/x/grammy@v1.18.3/mod.ts";
export * as GrammyTypes from "https://deno.land/x/grammy_types@v3.2.2/mod.ts"; export * as GrammyTypes from "https://deno.land/x/grammy_types@v3.2.2/mod.ts";
export * as GrammyAutoQuote from "https://deno.land/x/grammy_autoquote@v1.1.2/mod.ts"; export * as GrammyAutoQuote from "https://deno.land/x/grammy_autoquote@v1.1.2/mod.ts";
export * as GrammyParseMode from "https://deno.land/x/grammy_parse_mode@1.8.1/mod.ts"; export * as GrammyParseMode from "https://deno.land/x/grammy_parse_mode@1.8.1/mod.ts";
export * as GrammyKvStorage from "https://deno.land/x/grammy_storages@v2.3.1/denokv/src/mod.ts";
export * as GrammyStatelessQ from "https://deno.land/x/grammy_stateless_question_alpha@v3.0.4/mod.ts"; export * as GrammyStatelessQ from "https://deno.land/x/grammy_stateless_question_alpha@v3.0.4/mod.ts";
export * as GrammyFiles from "https://deno.land/x/grammy_files@v1.0.4/mod.ts"; export * as GrammyFiles from "https://deno.land/x/grammy_files@v1.0.4/mod.ts";
export * as FileType from "https://esm.sh/file-type@18.5.0"; export * as FileType from "https://esm.sh/file-type@18.5.0";

View File

@ -1,8 +1,8 @@
// Load environment variables from .env file
import "https://deno.land/std@0.201.0/dotenv/load.ts"; import "https://deno.land/std@0.201.0/dotenv/load.ts";
// Setup logging
import { Log } from "./deps.ts"; import { Log } from "./deps.ts";
import { bot } from "./bot/mod.ts";
import { runAllTasks } from "./app/mod.ts";
Log.setup({ Log.setup({
handlers: { handlers: {
console: new Log.handlers.ConsoleHandler("DEBUG"), console: new Log.handlers.ConsoleHandler("DEBUG"),
@ -12,9 +12,6 @@ Log.setup({
}, },
}); });
// Main program logic
import { bot } from "./bot/mod.ts";
import { runAllTasks } from "./app/mod.ts";
await Promise.all([ await Promise.all([
bot.start(), bot.start(),
runAllTasks(), runAllTasks(),

View File

@ -1,6 +1,8 @@
import { GrammyTypes } from "../deps.ts"; import { GrammyTypes } from "../deps.ts";
export function formatUserChat(ctx: { from?: GrammyTypes.User; chat?: GrammyTypes.Chat }) { export function formatUserChat(
ctx: { from?: GrammyTypes.User; chat?: GrammyTypes.Chat; sdInstanceId?: string },
) {
const msg: string[] = []; const msg: string[] = [];
if (ctx.from) { if (ctx.from) {
msg.push(ctx.from.first_name); msg.push(ctx.from.first_name);
@ -24,5 +26,8 @@ export function formatUserChat(ctx: { from?: GrammyTypes.User; chat?: GrammyType
} }
} }
} }
if (ctx.sdInstanceId) {
msg.push(`using ${ctx.sdInstanceId}`);
}
return msg.join(" "); return msg.join(" ");
} }