2023-09-24 13:08:35 +00:00
|
|
|
import { promiseState } from "async";
|
|
|
|
import { Chat, Message, User } from "grammy_types";
|
|
|
|
import { JobData, Queue, Worker } from "kvmq";
|
|
|
|
import createOpenApiClient from "openapi_fetch";
|
|
|
|
import { delay } from "std/async";
|
|
|
|
import { decode, encode } from "std/encoding/base64";
|
|
|
|
import { getLogger } from "std/log";
|
|
|
|
import { ulid } from "ulid";
|
2023-09-22 02:59:22 +00:00
|
|
|
import { bot } from "../bot/mod.ts";
|
2023-09-24 13:08:35 +00:00
|
|
|
import { SdError } from "../sd/SdError.ts";
|
2023-09-23 18:49:05 +00:00
|
|
|
import { PngInfo } from "../sd/parsePngInfo.ts";
|
|
|
|
import * as SdApi from "../sd/sdApi.ts";
|
2023-09-24 13:08:35 +00:00
|
|
|
import { formatOrdinal } from "../utils/formatOrdinal.ts";
|
2023-09-23 18:49:05 +00:00
|
|
|
import { formatUserChat } from "../utils/formatUserChat.ts";
|
|
|
|
import { getConfig, SdInstanceData } from "./config.ts";
|
2023-09-24 12:05:28 +00:00
|
|
|
import { db, fs } from "./db.ts";
|
|
|
|
import { SdGenerationInfo } from "./generationStore.ts";
|
|
|
|
import { uploadQueue } from "./uploadQueue.ts";
|
2023-09-22 02:59:22 +00:00
|
|
|
|
2023-09-24 13:08:35 +00:00
|
|
|
const logger = () => getLogger();
|
2023-09-22 02:59:22 +00:00
|
|
|
|
|
|
|
interface GenerationJob {
|
|
|
|
task:
|
|
|
|
| {
|
|
|
|
type: "txt2img";
|
|
|
|
params: Partial<PngInfo>;
|
|
|
|
}
|
|
|
|
| {
|
|
|
|
type: "img2img";
|
|
|
|
params: Partial<PngInfo>;
|
|
|
|
fileId: string;
|
|
|
|
};
|
2023-09-24 13:08:35 +00:00
|
|
|
from: User;
|
|
|
|
chat: Chat;
|
|
|
|
requestMessage: Message;
|
|
|
|
replyMessage: Message;
|
2023-09-22 02:59:22 +00:00
|
|
|
sdInstanceId?: string;
|
|
|
|
progress?: number;
|
|
|
|
}
|
|
|
|
|
2023-09-24 13:08:35 +00:00
|
|
|
export const generationQueue = new Queue<GenerationJob>(db, "jobQueue");
|
2023-09-22 02:59:22 +00:00
|
|
|
|
2023-09-24 13:08:35 +00:00
|
|
|
export const activeGenerationWorkers = new Map<string, Worker<GenerationJob>>();
|
2023-09-22 02:59:22 +00:00
|
|
|
|
|
|
|
/**
|
2023-09-24 12:05:28 +00:00
|
|
|
* Initializes queue workers for each SD instance when they become online.
|
2023-09-22 02:59:22 +00:00
|
|
|
*/
|
2023-09-24 12:05:28 +00:00
|
|
|
export async function processGenerationQueue() {
|
2023-09-22 02:59:22 +00:00
|
|
|
while (true) {
|
|
|
|
const config = await getConfig();
|
|
|
|
|
|
|
|
for (const sdInstance of config.sdInstances) {
|
|
|
|
const activeWorker = activeGenerationWorkers.get(sdInstance.id);
|
|
|
|
if (activeWorker?.isProcessing) continue;
|
|
|
|
|
2023-09-24 12:05:28 +00:00
|
|
|
const workerSdClient = createOpenApiClient<SdApi.paths>({
|
2023-09-22 02:59:22 +00:00
|
|
|
baseUrl: sdInstance.api.url,
|
|
|
|
headers: { "Authorization": sdInstance.api.auth },
|
|
|
|
});
|
|
|
|
|
|
|
|
// check if worker is up
|
2023-09-24 12:05:28 +00:00
|
|
|
const activeWorkerStatus = await workerSdClient.GET("/sdapi/v1/memory", {
|
2023-09-23 18:49:05 +00:00
|
|
|
signal: AbortSignal.timeout(10_000),
|
2023-09-22 02:59:22 +00:00
|
|
|
})
|
|
|
|
.then((response) => {
|
|
|
|
if (!response.data) {
|
|
|
|
throw new SdError("Failed to get worker status", response.response, response.error);
|
|
|
|
}
|
|
|
|
return response;
|
|
|
|
})
|
|
|
|
.catch((error) => {
|
|
|
|
logger().warning(`Worker ${sdInstance.id} is down: ${error}`);
|
|
|
|
});
|
|
|
|
if (!activeWorkerStatus?.data) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2023-09-24 12:05:28 +00:00
|
|
|
// create worker
|
|
|
|
const newWorker = generationQueue.createWorker(async ({ state }, updateJob) => {
|
|
|
|
await processGenerationJob(state, updateJob, sdInstance);
|
|
|
|
});
|
2023-09-22 02:59:22 +00:00
|
|
|
newWorker.addEventListener("error", (e) => {
|
2023-09-24 12:05:28 +00:00
|
|
|
logger().error(
|
|
|
|
`Generation failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`,
|
|
|
|
);
|
2023-09-22 02:59:22 +00:00
|
|
|
bot.api.sendMessage(
|
|
|
|
e.detail.job.state.requestMessage.chat.id,
|
2023-09-24 12:05:28 +00:00
|
|
|
`Generation failed: ${e.detail.error}\n\n` +
|
|
|
|
(e.detail.job.retryCount > 0
|
|
|
|
? `Will retry ${e.detail.job.retryCount} more times.`
|
|
|
|
: `Aborting.`),
|
2023-09-22 02:59:22 +00:00
|
|
|
{
|
|
|
|
reply_to_message_id: e.detail.job.state.requestMessage.message_id,
|
2023-09-24 12:05:28 +00:00
|
|
|
allow_sending_without_reply: true,
|
2023-09-22 02:59:22 +00:00
|
|
|
},
|
|
|
|
).catch(() => undefined);
|
2023-09-24 12:05:28 +00:00
|
|
|
if (e.detail.error instanceof SdError) {
|
|
|
|
newWorker.stopProcessing();
|
|
|
|
}
|
2023-09-22 02:59:22 +00:00
|
|
|
});
|
2023-09-24 12:05:28 +00:00
|
|
|
newWorker.processJobs();
|
2023-09-22 02:59:22 +00:00
|
|
|
activeGenerationWorkers.set(sdInstance.id, newWorker);
|
2023-09-24 12:05:28 +00:00
|
|
|
logger().info(`Started worker ${sdInstance.id}`);
|
2023-09-22 02:59:22 +00:00
|
|
|
}
|
2023-09-24 13:08:35 +00:00
|
|
|
await delay(60_000);
|
2023-09-22 02:59:22 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-24 12:05:28 +00:00
|
|
|
/**
|
|
|
|
* Processes a single job from the queue.
|
|
|
|
*/
|
2023-09-22 02:59:22 +00:00
|
|
|
async function processGenerationJob(
|
2023-09-24 12:05:28 +00:00
|
|
|
state: GenerationJob,
|
2023-09-24 13:08:35 +00:00
|
|
|
updateJob: (job: Partial<JobData<GenerationJob>>) => Promise<void>,
|
2023-09-22 02:59:22 +00:00
|
|
|
sdInstance: SdInstanceData,
|
|
|
|
) {
|
|
|
|
const startDate = new Date();
|
|
|
|
const workerSdClient = createOpenApiClient<SdApi.paths>({
|
|
|
|
baseUrl: sdInstance.api.url,
|
|
|
|
headers: { "Authorization": sdInstance.api.auth },
|
|
|
|
});
|
2023-09-24 12:05:28 +00:00
|
|
|
state.sdInstanceId = sdInstance.id;
|
|
|
|
logger().debug(`Generation started for ${formatUserChat(state)}`);
|
|
|
|
await updateJob({ state: state });
|
|
|
|
|
|
|
|
// check if bot can post messages in this chat
|
|
|
|
const chat = await bot.api.getChat(state.chat.id);
|
|
|
|
if (
|
|
|
|
(chat.type === "group" || chat.type === "supergroup") &&
|
|
|
|
(!chat.permissions?.can_send_messages || !chat.permissions?.can_send_photos)
|
|
|
|
) {
|
|
|
|
throw new Error("Bot doesn't have permissions to send photos in this chat");
|
2023-09-22 02:59:22 +00:00
|
|
|
}
|
|
|
|
|
2023-09-24 12:05:28 +00:00
|
|
|
// edit the existing status message
|
|
|
|
await bot.api.editMessageText(
|
|
|
|
state.replyMessage.chat.id,
|
|
|
|
state.replyMessage.message_id,
|
2023-09-24 19:54:44 +00:00
|
|
|
`Generating your prompt now... 0% using ${sdInstance.name || sdInstance.id}`,
|
2023-09-24 12:05:28 +00:00
|
|
|
{ maxAttempts: 1 },
|
|
|
|
);
|
2023-09-22 02:59:22 +00:00
|
|
|
|
|
|
|
// reduce size if worker can't handle the resolution
|
2023-09-24 12:05:28 +00:00
|
|
|
const config = await getConfig();
|
2023-09-22 02:59:22 +00:00
|
|
|
const size = limitSize(
|
2023-09-24 12:05:28 +00:00
|
|
|
{ ...config.defaultParams, ...state.task.params },
|
2023-09-22 02:59:22 +00:00
|
|
|
sdInstance.maxResolution,
|
|
|
|
);
|
|
|
|
function limitSize(
|
|
|
|
{ width, height }: { width?: number; height?: number },
|
|
|
|
maxResolution: number,
|
|
|
|
): { width?: number; height?: number } {
|
|
|
|
if (!width || !height) return {};
|
|
|
|
const ratio = width / height;
|
|
|
|
if (width * height > maxResolution) {
|
|
|
|
return {
|
|
|
|
width: Math.trunc(Math.sqrt(maxResolution * ratio)),
|
|
|
|
height: Math.trunc(Math.sqrt(maxResolution / ratio)),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
return { width, height };
|
|
|
|
}
|
|
|
|
|
|
|
|
// start generating the image
|
2023-09-24 12:05:28 +00:00
|
|
|
const responsePromise = state.task.type === "txt2img"
|
2023-09-22 02:59:22 +00:00
|
|
|
? workerSdClient.POST("/sdapi/v1/txt2img", {
|
|
|
|
body: {
|
|
|
|
...config.defaultParams,
|
2023-09-24 12:05:28 +00:00
|
|
|
...state.task.params,
|
2023-09-22 02:59:22 +00:00
|
|
|
...size,
|
2023-09-24 12:05:28 +00:00
|
|
|
negative_prompt: state.task.params.negative_prompt
|
|
|
|
? state.task.params.negative_prompt
|
2023-09-22 02:59:22 +00:00
|
|
|
: config.defaultParams?.negative_prompt,
|
|
|
|
},
|
|
|
|
})
|
2023-09-24 12:05:28 +00:00
|
|
|
: state.task.type === "img2img"
|
2023-09-22 02:59:22 +00:00
|
|
|
? workerSdClient.POST("/sdapi/v1/img2img", {
|
|
|
|
body: {
|
|
|
|
...config.defaultParams,
|
2023-09-24 12:05:28 +00:00
|
|
|
...state.task.params,
|
2023-09-22 02:59:22 +00:00
|
|
|
...size,
|
2023-09-24 12:05:28 +00:00
|
|
|
negative_prompt: state.task.params.negative_prompt
|
|
|
|
? state.task.params.negative_prompt
|
2023-09-22 02:59:22 +00:00
|
|
|
: config.defaultParams?.negative_prompt,
|
|
|
|
init_images: [
|
2023-09-24 13:08:35 +00:00
|
|
|
encode(
|
2023-09-22 02:59:22 +00:00
|
|
|
await fetch(
|
|
|
|
`https://api.telegram.org/file/bot${bot.token}/${await bot.api.getFile(
|
2023-09-24 12:05:28 +00:00
|
|
|
state.task.fileId,
|
2023-09-22 02:59:22 +00:00
|
|
|
).then((file) => file.file_path)}`,
|
|
|
|
).then((resp) => resp.arrayBuffer()),
|
|
|
|
),
|
|
|
|
],
|
|
|
|
},
|
|
|
|
})
|
|
|
|
: undefined;
|
|
|
|
|
|
|
|
if (!responsePromise) {
|
2023-09-24 12:05:28 +00:00
|
|
|
throw new Error(`Unknown task type: ${state.task.type}`);
|
2023-09-22 02:59:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// poll for progress while the generation request is pending
|
2023-09-24 12:05:28 +00:00
|
|
|
do {
|
2023-09-22 02:59:22 +00:00
|
|
|
const progressResponse = await workerSdClient.GET("/sdapi/v1/progress", {
|
|
|
|
params: {},
|
2023-09-24 12:05:28 +00:00
|
|
|
signal: AbortSignal.timeout(15000),
|
2023-09-22 02:59:22 +00:00
|
|
|
});
|
|
|
|
if (!progressResponse.data) {
|
|
|
|
throw new SdError(
|
2023-09-24 12:05:28 +00:00
|
|
|
"Progress request failed",
|
2023-09-22 02:59:22 +00:00
|
|
|
progressResponse.response,
|
|
|
|
progressResponse.error,
|
|
|
|
);
|
|
|
|
}
|
2023-09-24 12:05:28 +00:00
|
|
|
|
|
|
|
state.progress = progressResponse.data.progress;
|
|
|
|
await updateJob({ state: state });
|
|
|
|
|
|
|
|
await bot.api.sendChatAction(state.chat.id, "upload_photo", { maxAttempts: 1 })
|
2023-09-22 02:59:22 +00:00
|
|
|
.catch(() => undefined);
|
2023-09-24 12:05:28 +00:00
|
|
|
await bot.api.editMessageText(
|
|
|
|
state.replyMessage.chat.id,
|
|
|
|
state.replyMessage.message_id,
|
2023-09-24 19:54:44 +00:00
|
|
|
`Generating your prompt now... ${(progressResponse.data.progress * 100).toFixed(0)}% using ${
|
|
|
|
sdInstance.name || sdInstance.id
|
|
|
|
}`,
|
2023-09-24 12:05:28 +00:00
|
|
|
{ maxAttempts: 1 },
|
|
|
|
).catch(() => undefined);
|
2023-09-22 02:59:22 +00:00
|
|
|
|
2023-09-24 13:08:35 +00:00
|
|
|
await Promise.race([delay(3000), responsePromise]).catch(() => undefined);
|
|
|
|
} while (await promiseState(responsePromise) === "pending");
|
2023-09-24 12:05:28 +00:00
|
|
|
|
|
|
|
// check response
|
|
|
|
const response = await responsePromise;
|
2023-09-22 02:59:22 +00:00
|
|
|
if (!response.data) {
|
2023-09-24 12:05:28 +00:00
|
|
|
throw new SdError(`${state.task.type} failed`, response.response, response.error);
|
2023-09-22 02:59:22 +00:00
|
|
|
}
|
|
|
|
if (!response.data.images?.length) {
|
|
|
|
throw new Error("No images returned from SD");
|
|
|
|
}
|
|
|
|
|
|
|
|
// info field is a json serialized string so we need to parse it
|
|
|
|
const info: SdGenerationInfo = JSON.parse(response.data.info);
|
|
|
|
|
2023-09-24 12:05:28 +00:00
|
|
|
// save images to db
|
|
|
|
const imageKeys: Deno.KvKey[] = [];
|
|
|
|
for (const imageBase64 of response.data.images) {
|
2023-09-24 13:08:35 +00:00
|
|
|
const imageBuffer = decode(imageBase64);
|
|
|
|
const imageKey = ["images", "upload", ulid()];
|
2023-09-24 12:05:28 +00:00
|
|
|
await fs.set(imageKey, imageBuffer, { expireIn: 30 * 60 * 1000 });
|
|
|
|
imageKeys.push(imageKey);
|
|
|
|
}
|
|
|
|
|
|
|
|
// create a new upload job
|
|
|
|
await uploadQueue.pushJob({
|
|
|
|
chat: state.chat,
|
|
|
|
from: state.from,
|
|
|
|
requestMessage: state.requestMessage,
|
|
|
|
replyMessage: state.replyMessage,
|
|
|
|
sdInstanceId: sdInstance.id,
|
|
|
|
startDate,
|
|
|
|
endDate: new Date(),
|
|
|
|
imageKeys,
|
|
|
|
info,
|
|
|
|
}, { retryCount: 5, retryDelayMs: 10000 });
|
|
|
|
|
2023-09-22 02:59:22 +00:00
|
|
|
// change status message to uploading images
|
|
|
|
await bot.api.editMessageText(
|
2023-09-24 12:05:28 +00:00
|
|
|
state.replyMessage.chat.id,
|
|
|
|
state.replyMessage.message_id,
|
2023-09-22 02:59:22 +00:00
|
|
|
`Uploading your images...`,
|
|
|
|
{ maxAttempts: 1 },
|
|
|
|
).catch(() => undefined);
|
|
|
|
|
2023-09-24 12:05:28 +00:00
|
|
|
logger().debug(`Generation finished for ${formatUserChat(state)}`);
|
2023-09-22 02:59:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2023-09-24 12:05:28 +00:00
|
|
|
* Handles queue updates and updates the status message.
|
2023-09-22 02:59:22 +00:00
|
|
|
*/
|
2023-09-24 12:05:28 +00:00
|
|
|
export async function updateGenerationQueue() {
|
2023-09-22 02:59:22 +00:00
|
|
|
while (true) {
|
|
|
|
const jobs = await generationQueue.getAllJobs();
|
|
|
|
let index = 0;
|
|
|
|
for (const job of jobs) {
|
|
|
|
if (job.lockUntil > new Date()) {
|
|
|
|
// job is currently being processed, the worker will update its status message
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
if (!job.state.replyMessage) {
|
|
|
|
// no status message, nothing to update
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
index++;
|
|
|
|
await bot.api.editMessageText(
|
|
|
|
job.state.replyMessage.chat.id,
|
|
|
|
job.state.replyMessage.message_id,
|
|
|
|
`You are ${formatOrdinal(index)} in queue.`,
|
|
|
|
{ maxAttempts: 1 },
|
|
|
|
).catch(() => undefined);
|
|
|
|
}
|
2023-09-24 13:08:35 +00:00
|
|
|
await delay(3000);
|
2023-09-22 02:59:22 +00:00
|
|
|
}
|
|
|
|
}
|