big rewrite
This commit is contained in:
parent
3444fb01d0
commit
77231afa00
|
@ -22,3 +22,12 @@ You can put these in `.env` file or pass them as environment variables.
|
||||||
|
|
||||||
- Start stable diffusion webui: `cd sd-webui`, `./webui.sh --api`
|
- Start stable diffusion webui: `cd sd-webui`, `./webui.sh --api`
|
||||||
- Start bot: `deno task start`
|
- Start bot: `deno task start`
|
||||||
|
|
||||||
|
## Codegen
|
||||||
|
|
||||||
|
The Stable Diffusion API in `common/sdApi.ts` is auto-generated. To regenerate it, first start your
|
||||||
|
SD WebUI with `--nowebui --api`, and then run:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
deno run npm:openapi-typescript http://localhost:7861/openapi.json -o common/sdApi.ts
|
||||||
|
```
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
import { jobStore } from "../db/jobStore.ts";
|
import { generationQueue } from "../tasks/generationQueue.ts";
|
||||||
import { Context } from "./mod.ts";
|
import { Context } from "./mod.ts";
|
||||||
|
|
||||||
export async function cancelCommand(ctx: Context) {
|
export async function cancelCommand(ctx: Context) {
|
||||||
const jobs = await jobStore.getBy("status.type", { value: "waiting" });
|
const jobs = await generationQueue.getAllJobs();
|
||||||
const userJobs = jobs.filter((j) => j.value.from.id === ctx.from?.id);
|
const userJobs = jobs
|
||||||
for (const job of userJobs) await job.delete();
|
.filter((job) => job.lockUntil > new Date())
|
||||||
|
.filter((j) => j.state.from.id === ctx.from?.id);
|
||||||
|
for (const job of userJobs) await generationQueue.deleteJob(job.id);
|
||||||
await ctx.reply(`Cancelled ${userJobs.length} jobs`);
|
await ctx.reply(`Cancelled ${userJobs.length} jobs`);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
import { Collections, Grammy, GrammyStatelessQ } from "../deps.ts";
|
import { Collections, Grammy, GrammyStatelessQ } from "../deps.ts";
|
||||||
import { formatUserChat } from "../common/utils.ts";
|
import { formatUserChat } from "../common/formatUserChat.ts";
|
||||||
import { jobStore } from "../db/jobStore.ts";
|
|
||||||
import { parsePngInfo, PngInfo } from "../common/parsePngInfo.ts";
|
import { parsePngInfo, PngInfo } from "../common/parsePngInfo.ts";
|
||||||
import { Context, logger } from "./mod.ts";
|
import { Context, logger } from "./mod.ts";
|
||||||
|
import { generationQueue } from "../tasks/generationQueue.ts";
|
||||||
|
import { getConfig } from "../db/config.ts";
|
||||||
|
|
||||||
export const img2imgQuestion = new GrammyStatelessQ.StatelessQuestion<Context>(
|
export const img2imgQuestion = new GrammyStatelessQ.StatelessQuestion<Context>(
|
||||||
"img2img",
|
"img2img",
|
||||||
|
@ -27,23 +28,25 @@ async function img2img(
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx.session.global.pausedReason != null) {
|
const config = await getConfig();
|
||||||
await ctx.reply(`I'm paused: ${ctx.session.global.pausedReason || "No reason given"}`);
|
|
||||||
|
if (config.pausedReason != null) {
|
||||||
|
await ctx.reply(`I'm paused: ${config.pausedReason || "No reason given"}`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const jobs = await jobStore.getBy("status.type", { value: "waiting" });
|
const jobs = await generationQueue.getAllJobs();
|
||||||
if (jobs.length >= ctx.session.global.maxJobs) {
|
if (jobs.length >= config.maxJobs) {
|
||||||
await ctx.reply(
|
await ctx.reply(
|
||||||
`The queue is full. Try again later. (Max queue size: ${ctx.session.global.maxJobs})`,
|
`The queue is full. Try again later. (Max queue size: ${config.maxJobs})`,
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const userJobs = jobs.filter((job) => job.value.from.id === ctx.message?.from?.id);
|
const userJobs = jobs.filter((job) => job.state.from.id === ctx.message?.from?.id);
|
||||||
if (userJobs.length >= ctx.session.global.maxUserJobs) {
|
if (userJobs.length >= config.maxUserJobs) {
|
||||||
await ctx.reply(
|
await ctx.reply(
|
||||||
`You already have ${ctx.session.global.maxUserJobs} jobs in queue. Try again later.`,
|
`You already have ${config.maxUserJobs} jobs in queue. Try again later.`,
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -98,12 +101,12 @@ async function img2img(
|
||||||
|
|
||||||
const replyMessage = await ctx.reply("Accepted. You are now in queue.");
|
const replyMessage = await ctx.reply("Accepted. You are now in queue.");
|
||||||
|
|
||||||
await jobStore.create({
|
await generationQueue.pushJob({
|
||||||
task: { type: "img2img", params, fileId },
|
task: { type: "img2img", params, fileId },
|
||||||
from: ctx.message.from,
|
from: ctx.message.from,
|
||||||
chat: ctx.message.chat,
|
chat: ctx.message.chat,
|
||||||
requestMessageId: ctx.message.message_id,
|
requestMessage: ctx.message,
|
||||||
status: { type: "waiting", message: replyMessage },
|
replyMessage: replyMessage,
|
||||||
});
|
});
|
||||||
|
|
||||||
logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`);
|
logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`);
|
||||||
|
|
70
bot/mod.ts
70
bot/mod.ts
|
@ -1,47 +1,81 @@
|
||||||
import { Grammy, GrammyAutoQuote, GrammyFiles, GrammyParseMode, Log } from "../deps.ts";
|
import { Grammy, GrammyAutoQuote, GrammyFiles, GrammyParseMode, Log } from "../deps.ts";
|
||||||
import { formatUserChat } from "../common/utils.ts";
|
import { formatUserChat } from "../common/formatUserChat.ts";
|
||||||
import { session, SessionFlavor } from "./session.ts";
|
|
||||||
import { queueCommand } from "./queueCommand.ts";
|
import { queueCommand } from "./queueCommand.ts";
|
||||||
import { txt2imgCommand, txt2imgQuestion } from "./txt2imgCommand.ts";
|
import { txt2imgCommand, txt2imgQuestion } from "./txt2imgCommand.ts";
|
||||||
import { pnginfoCommand, pnginfoQuestion } from "./pnginfoCommand.ts";
|
import { pnginfoCommand, pnginfoQuestion } from "./pnginfoCommand.ts";
|
||||||
import { img2imgCommand, img2imgQuestion } from "./img2imgCommand.ts";
|
import { img2imgCommand, img2imgQuestion } from "./img2imgCommand.ts";
|
||||||
import { cancelCommand } from "./cancelCommand.ts";
|
import { cancelCommand } from "./cancelCommand.ts";
|
||||||
|
import { getConfig, setConfig } from "../db/config.ts";
|
||||||
|
|
||||||
export const logger = () => Log.getLogger();
|
export const logger = () => Log.getLogger();
|
||||||
|
|
||||||
|
interface SessionData {
|
||||||
|
chat: ChatData;
|
||||||
|
user: UserData;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ChatData {
|
||||||
|
language?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface UserData {
|
||||||
|
params?: Record<string, string>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type Context =
|
||||||
|
& GrammyFiles.FileFlavor<GrammyParseMode.ParseModeFlavor<Grammy.Context>>
|
||||||
|
& Grammy.SessionFlavor<SessionData>;
|
||||||
|
|
||||||
type WithRetryApi<T extends Grammy.RawApi> = {
|
type WithRetryApi<T extends Grammy.RawApi> = {
|
||||||
[M in keyof T]: T[M] extends (args: infer P, ...rest: infer A) => infer R
|
[M in keyof T]: T[M] extends (args: infer P, ...rest: infer A) => infer R
|
||||||
? (args: P extends object ? P & { maxAttempts?: number } : P, ...rest: A) => R
|
? (args: P extends object ? P & { maxAttempts?: number } : P, ...rest: A) => R
|
||||||
: T[M];
|
: T[M];
|
||||||
};
|
};
|
||||||
|
|
||||||
export type Context =
|
type Api = Grammy.Api<WithRetryApi<Grammy.RawApi>>;
|
||||||
& GrammyFiles.FileFlavor<GrammyParseMode.ParseModeFlavor<Grammy.Context>>
|
|
||||||
& SessionFlavor;
|
export const bot = new Grammy.Bot<Context, Api>(Deno.env.get("TG_BOT_TOKEN")!);
|
||||||
export const bot = new Grammy.Bot<Context, Grammy.Api<WithRetryApi<Grammy.RawApi>>>(
|
|
||||||
Deno.env.get("TG_BOT_TOKEN") ?? "",
|
|
||||||
);
|
|
||||||
bot.use(GrammyAutoQuote.autoQuote);
|
bot.use(GrammyAutoQuote.autoQuote);
|
||||||
bot.use(GrammyParseMode.hydrateReply);
|
bot.use(GrammyParseMode.hydrateReply);
|
||||||
bot.use(session);
|
bot.use(Grammy.session<
|
||||||
|
SessionData,
|
||||||
|
Grammy.Context & Grammy.SessionFlavor<SessionData>
|
||||||
|
>({
|
||||||
|
type: "multi",
|
||||||
|
chat: {
|
||||||
|
initial: () => ({}),
|
||||||
|
},
|
||||||
|
user: {
|
||||||
|
getSessionKey: (ctx) => ctx.from?.id.toFixed(),
|
||||||
|
initial: () => ({}),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
bot.api.config.use(GrammyFiles.hydrateFiles(bot.token));
|
bot.api.config.use(GrammyFiles.hydrateFiles(bot.token));
|
||||||
|
|
||||||
// Automatically cancel requests after 30 seconds
|
// Automatically cancel requests after 30 seconds
|
||||||
bot.api.config.use(async (prev, method, payload, signal) => {
|
bot.api.config.use(async (prev, method, payload, signal) => {
|
||||||
|
// don't time out getUpdates requests, they are long-polling
|
||||||
|
if (method === "getUpdates") return prev(method, payload, signal);
|
||||||
|
|
||||||
const controller = new AbortController();
|
const controller = new AbortController();
|
||||||
let timedOut = false;
|
let timedOut = false;
|
||||||
const timeout = setTimeout(() => {
|
const timeout = setTimeout(() => {
|
||||||
timedOut = true;
|
timedOut = true;
|
||||||
// TODO: this sometimes throws with "can't abort a locked stream" and crashes whole process
|
// TODO: this sometimes throws with "can't abort a locked stream", why?
|
||||||
controller.abort();
|
try {
|
||||||
|
controller.abort();
|
||||||
|
} catch (error) {
|
||||||
|
logger().error(`Error while cancelling on timeout: ${error}`);
|
||||||
|
}
|
||||||
}, 30 * 1000);
|
}, 30 * 1000);
|
||||||
signal?.addEventListener("abort", () => {
|
signal?.addEventListener("abort", () => {
|
||||||
controller.abort();
|
controller.abort();
|
||||||
});
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const result = await prev(method, payload, controller.signal);
|
return await prev(method, payload, controller.signal);
|
||||||
return result;
|
|
||||||
} finally {
|
} finally {
|
||||||
clearTimeout(timeout);
|
clearTimeout(timeout);
|
||||||
if (timedOut) {
|
if (timedOut) {
|
||||||
|
@ -121,24 +155,26 @@ bot.command("queue", queueCommand);
|
||||||
|
|
||||||
bot.command("cancel", cancelCommand);
|
bot.command("cancel", cancelCommand);
|
||||||
|
|
||||||
bot.command("pause", (ctx) => {
|
bot.command("pause", async (ctx) => {
|
||||||
if (!ctx.from?.username) return;
|
if (!ctx.from?.username) return;
|
||||||
const config = ctx.session.global;
|
const config = await getConfig();
|
||||||
if (!config.adminUsernames.includes(ctx.from.username)) return;
|
if (!config.adminUsernames.includes(ctx.from.username)) return;
|
||||||
if (config.pausedReason != null) {
|
if (config.pausedReason != null) {
|
||||||
return ctx.reply(`Already paused: ${config.pausedReason}`);
|
return ctx.reply(`Already paused: ${config.pausedReason}`);
|
||||||
}
|
}
|
||||||
config.pausedReason = ctx.match ?? "No reason given";
|
config.pausedReason = ctx.match ?? "No reason given";
|
||||||
|
await setConfig(config);
|
||||||
logger().warning(`Bot paused by ${ctx.from.first_name} because ${config.pausedReason}`);
|
logger().warning(`Bot paused by ${ctx.from.first_name} because ${config.pausedReason}`);
|
||||||
return ctx.reply("Paused");
|
return ctx.reply("Paused");
|
||||||
});
|
});
|
||||||
|
|
||||||
bot.command("resume", (ctx) => {
|
bot.command("resume", async (ctx) => {
|
||||||
if (!ctx.from?.username) return;
|
if (!ctx.from?.username) return;
|
||||||
const config = ctx.session.global;
|
const config = await getConfig();
|
||||||
if (!config.adminUsernames.includes(ctx.from.username)) return;
|
if (!config.adminUsernames.includes(ctx.from.username)) return;
|
||||||
if (config.pausedReason == null) return ctx.reply("Already running");
|
if (config.pausedReason == null) return ctx.reply("Already running");
|
||||||
config.pausedReason = null;
|
config.pausedReason = null;
|
||||||
|
await setConfig(config);
|
||||||
logger().info(`Bot resumed by ${ctx.from.first_name}`);
|
logger().info(`Bot resumed by ${ctx.from.first_name}`);
|
||||||
return ctx.reply("Resumed");
|
return ctx.reply("Resumed");
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import { Grammy, GrammyParseMode, GrammyStatelessQ } from "../deps.ts";
|
import { Grammy, GrammyParseMode, GrammyStatelessQ } from "../deps.ts";
|
||||||
import { fmt } from "../common/utils.ts";
|
|
||||||
import { getPngInfo, parsePngInfo } from "../common/parsePngInfo.ts";
|
import { getPngInfo, parsePngInfo } from "../common/parsePngInfo.ts";
|
||||||
import { Context } from "./mod.ts";
|
import { Context } from "./mod.ts";
|
||||||
|
|
||||||
|
@ -31,7 +30,7 @@ async function pnginfo(ctx: Context, includeRepliedTo: boolean): Promise<void> {
|
||||||
const buffer = await fetch(file.getUrl()).then((resp) => resp.arrayBuffer());
|
const buffer = await fetch(file.getUrl()).then((resp) => resp.arrayBuffer());
|
||||||
const params = parsePngInfo(getPngInfo(new Uint8Array(buffer)) ?? "");
|
const params = parsePngInfo(getPngInfo(new Uint8Array(buffer)) ?? "");
|
||||||
|
|
||||||
const { bold } = GrammyParseMode;
|
const { bold, fmt } = GrammyParseMode;
|
||||||
|
|
||||||
const paramsText = fmt([
|
const paramsText = fmt([
|
||||||
`${params.prompt}\n`,
|
`${params.prompt}\n`,
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
import { Grammy, GrammyParseMode } from "../deps.ts";
|
import { Grammy, GrammyParseMode } from "../deps.ts";
|
||||||
import { fmt } from "../common/utils.ts";
|
|
||||||
import { runningWorkers } from "../tasks/pingWorkers.ts";
|
|
||||||
import { jobStore } from "../db/jobStore.ts";
|
|
||||||
import { Context, logger } from "./mod.ts";
|
import { Context, logger } from "./mod.ts";
|
||||||
import { getFlagEmoji } from "../common/getFlagEmoji.ts";
|
import { getFlagEmoji } from "../common/getFlagEmoji.ts";
|
||||||
|
import { activeGenerationWorkers, generationQueue } from "../tasks/generationQueue.ts";
|
||||||
|
import { getConfig } from "../db/config.ts";
|
||||||
|
|
||||||
export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
|
export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
|
||||||
let formattedMessage = await getMessageText();
|
let formattedMessage = await getMessageText();
|
||||||
|
@ -11,38 +10,41 @@ export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
|
||||||
handleFutureUpdates().catch((err) => logger().warning(`Updating queue message failed: ${err}`));
|
handleFutureUpdates().catch((err) => logger().warning(`Updating queue message failed: ${err}`));
|
||||||
|
|
||||||
async function getMessageText() {
|
async function getMessageText() {
|
||||||
const processingJobs = await jobStore.getBy("status.type", { value: "processing" })
|
const config = await getConfig();
|
||||||
.then((jobs) => jobs.map((job) => ({ ...job.value, place: 0 })));
|
const allJobs = await generationQueue.getAllJobs();
|
||||||
const waitingJobs = await jobStore.getBy("status.type", { value: "waiting" })
|
const processingJobs = allJobs
|
||||||
.then((jobs) => jobs.map((job, index) => ({ ...job.value, place: index + 1 })));
|
.filter((job) => job.lockUntil > new Date()).map((job) => ({ ...job, index: 0 }));
|
||||||
|
const waitingJobs = allJobs
|
||||||
|
.filter((job) => job.lockUntil <= new Date())
|
||||||
|
.map((job, index) => ({ ...job, index: index + 1 }));
|
||||||
const jobs = [...processingJobs, ...waitingJobs];
|
const jobs = [...processingJobs, ...waitingJobs];
|
||||||
const { bold } = GrammyParseMode;
|
const { bold, fmt } = GrammyParseMode;
|
||||||
|
|
||||||
return fmt([
|
return fmt([
|
||||||
"Current queue:\n",
|
"Current queue:\n",
|
||||||
...jobs.length > 0
|
...jobs.length > 0
|
||||||
? jobs.flatMap((job) => [
|
? jobs.flatMap((job) => [
|
||||||
`${job.place}. `,
|
`${job.index}. `,
|
||||||
fmt`${bold(job.from.first_name)} `,
|
fmt`${bold(job.state.from.first_name)} `,
|
||||||
job.from.last_name ? fmt`${bold(job.from.last_name)} ` : "",
|
job.state.from.last_name ? fmt`${bold(job.state.from.last_name)} ` : "",
|
||||||
job.from.username ? `(@${job.from.username}) ` : "",
|
job.state.from.username ? `(@${job.state.from.username}) ` : "",
|
||||||
getFlagEmoji(job.from.language_code) ?? "",
|
getFlagEmoji(job.state.from.language_code) ?? "",
|
||||||
job.chat.type === "private" ? " in private chat " : ` in ${job.chat.title} `,
|
job.state.chat.type === "private" ? " in private chat " : ` in ${job.state.chat.title} `,
|
||||||
job.chat.type !== "private" && job.chat.type !== "group" &&
|
job.state.chat.type !== "private" && job.state.chat.type !== "group" &&
|
||||||
job.chat.username
|
job.state.chat.username
|
||||||
? `(@${job.chat.username}) `
|
? `(@${job.state.chat.username}) `
|
||||||
: "",
|
: "",
|
||||||
job.status.type === "processing"
|
job.index === 0 && job.state.progress && job.state.sdInstanceId
|
||||||
? `(${(job.status.progress * 100).toFixed(0)}% using ${job.status.worker}) `
|
? `(${(job.state.progress * 100).toFixed(0)}% using ${job.state.sdInstanceId}) `
|
||||||
: "",
|
: "",
|
||||||
"\n",
|
"\n",
|
||||||
])
|
])
|
||||||
: ["Queue is empty.\n"],
|
: ["Queue is empty.\n"],
|
||||||
"\nActive workers:\n",
|
"\nActive workers:\n",
|
||||||
...ctx.session.global.workers.flatMap((worker) => [
|
...config.sdInstances.flatMap((sdInstance) => [
|
||||||
runningWorkers.has(worker.id) ? "✅ " : "☠️ ",
|
activeGenerationWorkers.has(sdInstance.id) ? "✅ " : "☠️ ",
|
||||||
fmt`${bold(worker.name || worker.id)} `,
|
fmt`${bold(sdInstance.name || sdInstance.id)} `,
|
||||||
`(max ${(worker.maxResolution / 1000000).toFixed(1)} Mpx) `,
|
`(max ${(sdInstance.maxResolution / 1000000).toFixed(1)} Mpx) `,
|
||||||
"\n",
|
"\n",
|
||||||
]),
|
]),
|
||||||
]);
|
]);
|
||||||
|
|
|
@ -1,81 +0,0 @@
|
||||||
import { db } from "../db/db.ts";
|
|
||||||
import { Grammy, GrammyKvStorage } from "../deps.ts";
|
|
||||||
import { SdApi, SdTxt2ImgRequest } from "../common/sdApi.ts";
|
|
||||||
|
|
||||||
export type SessionFlavor = Grammy.SessionFlavor<SessionData>;
|
|
||||||
|
|
||||||
export interface SessionData {
|
|
||||||
global: GlobalData;
|
|
||||||
chat: ChatData;
|
|
||||||
user: UserData;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface GlobalData {
|
|
||||||
adminUsernames: string[];
|
|
||||||
pausedReason: string | null;
|
|
||||||
maxUserJobs: number;
|
|
||||||
maxJobs: number;
|
|
||||||
defaultParams?: Partial<SdTxt2ImgRequest>;
|
|
||||||
workers: WorkerData[];
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface WorkerData {
|
|
||||||
id: string;
|
|
||||||
name?: string;
|
|
||||||
api: SdApi;
|
|
||||||
maxResolution: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ChatData {
|
|
||||||
language?: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface UserData {
|
|
||||||
params?: Partial<SdTxt2ImgRequest>;
|
|
||||||
}
|
|
||||||
|
|
||||||
const globalDbAdapter = new GrammyKvStorage.DenoKVAdapter<GlobalData>(db);
|
|
||||||
|
|
||||||
const getDefaultGlobalData = (): GlobalData => ({
|
|
||||||
adminUsernames: Deno.env.get("TG_ADMIN_USERS")?.split(",") ?? [],
|
|
||||||
pausedReason: null,
|
|
||||||
maxUserJobs: 3,
|
|
||||||
maxJobs: 20,
|
|
||||||
defaultParams: {
|
|
||||||
batch_size: 1,
|
|
||||||
n_iter: 1,
|
|
||||||
width: 512,
|
|
||||||
height: 768,
|
|
||||||
steps: 30,
|
|
||||||
cfg_scale: 10,
|
|
||||||
negative_prompt: "boring_e621_fluffyrock_v4 boring_e621_v4",
|
|
||||||
},
|
|
||||||
workers: [
|
|
||||||
{
|
|
||||||
id: "local",
|
|
||||||
api: { url: Deno.env.get("SD_API_URL") ?? "http://127.0.0.1:7860/" },
|
|
||||||
maxResolution: 1024 * 1024,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
});
|
|
||||||
|
|
||||||
export const session = Grammy.session<SessionData, Grammy.Context & SessionFlavor>({
|
|
||||||
type: "multi",
|
|
||||||
global: {
|
|
||||||
getSessionKey: () => "global",
|
|
||||||
initial: getDefaultGlobalData,
|
|
||||||
storage: globalDbAdapter,
|
|
||||||
},
|
|
||||||
chat: {
|
|
||||||
initial: () => ({}),
|
|
||||||
},
|
|
||||||
user: {
|
|
||||||
getSessionKey: (ctx) => ctx.from?.id.toFixed(),
|
|
||||||
initial: () => ({}),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
export async function getGlobalSession(): Promise<GlobalData> {
|
|
||||||
const data = await globalDbAdapter.read("global");
|
|
||||||
return data ?? getDefaultGlobalData();
|
|
||||||
}
|
|
|
@ -1,8 +1,9 @@
|
||||||
import { Grammy, GrammyStatelessQ } from "../deps.ts";
|
import { Grammy, GrammyStatelessQ } from "../deps.ts";
|
||||||
import { formatUserChat } from "../common/utils.ts";
|
import { formatUserChat } from "../common/formatUserChat.ts";
|
||||||
import { jobStore } from "../db/jobStore.ts";
|
|
||||||
import { getPngInfo, parsePngInfo, PngInfo } from "../common/parsePngInfo.ts";
|
import { getPngInfo, parsePngInfo, PngInfo } from "../common/parsePngInfo.ts";
|
||||||
import { Context, logger } from "./mod.ts";
|
import { Context, logger } from "./mod.ts";
|
||||||
|
import { generationQueue } from "../tasks/generationQueue.ts";
|
||||||
|
import { getConfig } from "../db/config.ts";
|
||||||
|
|
||||||
export const txt2imgQuestion = new GrammyStatelessQ.StatelessQuestion<Context>(
|
export const txt2imgQuestion = new GrammyStatelessQ.StatelessQuestion<Context>(
|
||||||
"txt2img",
|
"txt2img",
|
||||||
|
@ -22,23 +23,25 @@ async function txt2img(ctx: Context, match: string, includeRepliedTo: boolean):
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx.session.global.pausedReason != null) {
|
const config = await getConfig();
|
||||||
await ctx.reply(`I'm paused: ${ctx.session.global.pausedReason || "No reason given"}`);
|
|
||||||
|
if (config.pausedReason != null) {
|
||||||
|
await ctx.reply(`I'm paused: ${config.pausedReason || "No reason given"}`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const jobs = await jobStore.getBy("status.type", { value: "waiting" });
|
const jobs = await generationQueue.getAllJobs();
|
||||||
if (jobs.length >= ctx.session.global.maxJobs) {
|
if (jobs.length >= config.maxJobs) {
|
||||||
await ctx.reply(
|
await ctx.reply(
|
||||||
`The queue is full. Try again later. (Max queue size: ${ctx.session.global.maxJobs})`,
|
`The queue is full. Try again later. (Max queue size: ${config.maxJobs})`,
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const userJobs = jobs.filter((job) => job.value.from.id === ctx.message?.from?.id);
|
const userJobs = jobs.filter((job) => job.state.from.id === ctx.message?.from?.id);
|
||||||
if (userJobs.length >= ctx.session.global.maxUserJobs) {
|
if (userJobs.length >= config.maxUserJobs) {
|
||||||
await ctx.reply(
|
await ctx.reply(
|
||||||
`You already have ${ctx.session.global.maxUserJobs} jobs in queue. Try again later.`,
|
`You already have ${config.maxUserJobs} jobs in queue. Try again later.`,
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -72,12 +75,12 @@ async function txt2img(ctx: Context, match: string, includeRepliedTo: boolean):
|
||||||
|
|
||||||
const replyMessage = await ctx.reply("Accepted. You are now in queue.");
|
const replyMessage = await ctx.reply("Accepted. You are now in queue.");
|
||||||
|
|
||||||
await jobStore.create({
|
await generationQueue.pushJob({
|
||||||
task: { type: "txt2img", params },
|
task: { type: "txt2img", params },
|
||||||
from: ctx.message.from,
|
from: ctx.message.from,
|
||||||
chat: ctx.message.chat,
|
chat: ctx.message.chat,
|
||||||
requestMessageId: ctx.message.message_id,
|
requestMessage: ctx.message,
|
||||||
status: { type: "waiting", message: replyMessage },
|
replyMessage: replyMessage,
|
||||||
});
|
});
|
||||||
|
|
||||||
logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`);
|
logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`);
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
export interface SdErrorData {
|
||||||
|
/**
|
||||||
|
* The HTTP status message or array of invalid fields.
|
||||||
|
* Can also be empty string.
|
||||||
|
*/
|
||||||
|
detail?: string | Array<{ loc: (string | number)[]; msg: string; type: string }>;
|
||||||
|
/** Can be e.g. "OutOfMemoryError" or undefined. */
|
||||||
|
error?: string;
|
||||||
|
/** Empty string. */
|
||||||
|
body?: string;
|
||||||
|
/** Long description of error. */
|
||||||
|
errors?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class SdError extends Error {
|
||||||
|
constructor(
|
||||||
|
prefix: string,
|
||||||
|
public readonly response: Response,
|
||||||
|
public readonly body?: SdErrorData,
|
||||||
|
) {
|
||||||
|
let message = `${prefix}: ${response.status} ${response.statusText}`;
|
||||||
|
if (body?.error) {
|
||||||
|
message += `: ${body.error}`;
|
||||||
|
if (body.errors) message += ` - ${body.errors}`;
|
||||||
|
} else if (typeof body?.detail === "string" && body.detail.length > 0) {
|
||||||
|
message += `: ${body.detail}`;
|
||||||
|
} else if (body?.detail) {
|
||||||
|
message += `: ${JSON.stringify(body.detail)}`;
|
||||||
|
}
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
export function deadline(timeout: number): AbortSignal {
|
||||||
|
const controller = new AbortController();
|
||||||
|
setTimeout(() => controller.abort(), timeout);
|
||||||
|
return controller.signal;
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
export function formatOrdinal(n: number) {
|
||||||
|
if (n % 100 === 11 || n % 100 === 12 || n % 100 === 13) return `${n}th`;
|
||||||
|
if (n % 10 === 1) return `${n}st`;
|
||||||
|
if (n % 10 === 2) return `${n}nd`;
|
||||||
|
if (n % 10 === 3) return `${n}rd`;
|
||||||
|
return `${n}th`;
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
import { GrammyTypes } from "../deps.ts";
|
||||||
|
|
||||||
|
export function formatUserChat(ctx: { from?: GrammyTypes.User; chat?: GrammyTypes.Chat }) {
|
||||||
|
const msg: string[] = [];
|
||||||
|
if (ctx.from) {
|
||||||
|
msg.push(ctx.from.first_name);
|
||||||
|
if (ctx.from.last_name) msg.push(ctx.from.last_name);
|
||||||
|
if (ctx.from.username) msg.push(`(@${ctx.from.username})`);
|
||||||
|
if (ctx.from.language_code) msg.push(`(${ctx.from.language_code.toUpperCase()})`);
|
||||||
|
}
|
||||||
|
if (ctx.chat) {
|
||||||
|
if (
|
||||||
|
ctx.chat.type === "group" ||
|
||||||
|
ctx.chat.type === "supergroup" ||
|
||||||
|
ctx.chat.type === "channel"
|
||||||
|
) {
|
||||||
|
msg.push("in");
|
||||||
|
msg.push(ctx.chat.title);
|
||||||
|
if (
|
||||||
|
(ctx.chat.type === "supergroup" || ctx.chat.type === "channel") &&
|
||||||
|
ctx.chat.username
|
||||||
|
) {
|
||||||
|
msg.push(`(@${ctx.chat.username})`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return msg.join(" ");
|
||||||
|
}
|
|
@ -1,47 +1,47 @@
|
||||||
/** Language to biggest country emoji map */
|
/** Language to biggest country emoji map */
|
||||||
const languageToFlagMap: Record<string, string> = {
|
const languageToFlagMap: Record<string, string> = {
|
||||||
"en": "🇺🇸",
|
"en": "🇺🇸", // english - united states
|
||||||
"zh": "🇨🇳",
|
"zh": "🇨🇳", // chinese - china
|
||||||
"es": "🇪🇸",
|
"es": "🇪🇸", // spanish - spain
|
||||||
"hi": "🇮🇳",
|
"hi": "🇮🇳", // hindi - india
|
||||||
"ar": "🇪🇬",
|
"ar": "🇪🇬", // arabic - egypt
|
||||||
"pt": "🇧🇷",
|
"pt": "🇧🇷", // portuguese - brazil
|
||||||
"bn": "🇧🇩",
|
"bn": "🇧🇩", // bengali - bangladesh
|
||||||
"ru": "🇷🇺",
|
"ru": "🇷🇺", // russian - russia
|
||||||
"ja": "🇯🇵",
|
"ja": "🇯🇵", // japanese - japan
|
||||||
"pa": "🇮🇳",
|
"pa": "🇮🇳", // punjabi - india
|
||||||
"de": "🇩🇪",
|
"de": "🇩🇪", // german - germany
|
||||||
"ko": "🇰🇷",
|
"ko": "🇰🇷", // korean - south korea
|
||||||
"fr": "🇫🇷",
|
"fr": "🇫🇷", // french - france
|
||||||
"tr": "🇹🇷",
|
"tr": "🇹🇷", // turkish - turkey
|
||||||
"ur": "🇵🇰",
|
"ur": "🇵🇰", // urdu - pakistan
|
||||||
"it": "🇮🇹",
|
"it": "🇮🇹", // italian - italy
|
||||||
"th": "🇹🇭",
|
"th": "🇹🇭", // thai - thailand
|
||||||
"vi": "🇻🇳",
|
"vi": "🇻🇳", // vietnamese - vietnam
|
||||||
"pl": "🇵🇱",
|
"pl": "🇵🇱", // polish - poland
|
||||||
"uk": "🇺🇦",
|
"uk": "🇺🇦", // ukrainian - ukraine
|
||||||
"uz": "🇺🇿",
|
"uz": "🇺🇿", // uzbek - uzbekistan
|
||||||
"su": "🇮🇩",
|
"su": "🇮🇩", // sundanese - indonesia
|
||||||
"sw": "🇹🇿",
|
"sw": "🇹🇿", // swahili - tanzania
|
||||||
"nl": "🇳🇱",
|
"nl": "🇳🇱", // dutch - netherlands
|
||||||
"fi": "🇫🇮",
|
"fi": "🇫🇮", // finnish - finland
|
||||||
"el": "🇬🇷",
|
"el": "🇬🇷", // greek - greece
|
||||||
"da": "🇩🇰",
|
"da": "🇩🇰", // danish - denmark
|
||||||
"cs": "🇨🇿",
|
"cs": "🇨🇿", // czech - czech republic
|
||||||
"sk": "🇸🇰",
|
"sk": "🇸🇰", // slovak - slovakia
|
||||||
"bg": "🇧🇬",
|
"bg": "🇧🇬", // bulgarian - bulgaria
|
||||||
"sv": "🇸🇪",
|
"sv": "🇸🇪", // swedish - sweden
|
||||||
"be": "🇧🇾",
|
"be": "🇧🇾", // belarusian - belarus
|
||||||
"hu": "🇭🇺",
|
"hu": "🇭🇺", // hungarian - hungary
|
||||||
"lt": "🇱🇹",
|
"lt": "🇱🇹", // lithuanian - lithuania
|
||||||
"lv": "🇱🇻",
|
"lv": "🇱🇻", // latvian - latvia
|
||||||
"et": "🇪🇪",
|
"et": "🇪🇪", // estonian - estonia
|
||||||
"sl": "🇸🇮",
|
"sl": "🇸🇮", // slovenian - slovenia
|
||||||
"hr": "🇭🇷",
|
"hr": "🇭🇷", // croatian - croatia
|
||||||
"zu": "🇿🇦",
|
"zu": "🇿🇦", // zulu - south africa
|
||||||
"id": "🇮🇩",
|
"id": "🇮🇩", // indonesian - indonesia
|
||||||
"is": "🇮🇸",
|
"is": "🇮🇸", // icelandic - iceland
|
||||||
"lb": "🇱🇺", // Luxembourgish - Luxembourg
|
"lb": "🇱🇺", // luxembourgish - luxembourg
|
||||||
};
|
};
|
||||||
|
|
||||||
export function getFlagEmoji(languageCode?: string): string | undefined {
|
export function getFlagEmoji(languageCode?: string): string | undefined {
|
||||||
|
|
3976
common/sdApi.ts
3976
common/sdApi.ts
File diff suppressed because it is too large
Load Diff
|
@ -1,60 +0,0 @@
|
||||||
import { GrammyParseMode, GrammyTypes } from "../deps.ts";
|
|
||||||
|
|
||||||
export function formatOrdinal(n: number) {
|
|
||||||
if (n % 100 === 11 || n % 100 === 12 || n % 100 === 13) return `${n}th`;
|
|
||||||
if (n % 10 === 1) return `${n}st`;
|
|
||||||
if (n % 10 === 2) return `${n}nd`;
|
|
||||||
if (n % 10 === 3) return `${n}rd`;
|
|
||||||
return `${n}th`;
|
|
||||||
}
|
|
||||||
|
|
||||||
export const fmt = (
|
|
||||||
rawStringParts: TemplateStringsArray | GrammyParseMode.Stringable[],
|
|
||||||
...stringLikes: GrammyParseMode.Stringable[]
|
|
||||||
): GrammyParseMode.FormattedString => {
|
|
||||||
let text = "";
|
|
||||||
const entities: GrammyTypes.MessageEntity[] = [];
|
|
||||||
|
|
||||||
const length = Math.max(rawStringParts.length, stringLikes.length);
|
|
||||||
for (let i = 0; i < length; i++) {
|
|
||||||
for (const stringLike of [rawStringParts[i], stringLikes[i]]) {
|
|
||||||
if (stringLike instanceof GrammyParseMode.FormattedString) {
|
|
||||||
entities.push(
|
|
||||||
...stringLike.entities.map((e) => ({
|
|
||||||
...e,
|
|
||||||
offset: e.offset + text.length,
|
|
||||||
})),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if (stringLike != null) text += stringLike.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new GrammyParseMode.FormattedString(text, entities);
|
|
||||||
};
|
|
||||||
|
|
||||||
export function formatUserChat(ctx: { from?: GrammyTypes.User; chat?: GrammyTypes.Chat }) {
|
|
||||||
const msg: string[] = [];
|
|
||||||
if (ctx.from) {
|
|
||||||
msg.push(ctx.from.first_name);
|
|
||||||
if (ctx.from.last_name) msg.push(ctx.from.last_name);
|
|
||||||
if (ctx.from.username) msg.push(`(@${ctx.from.username})`);
|
|
||||||
if (ctx.from.language_code) msg.push(`(${ctx.from.language_code.toUpperCase()})`);
|
|
||||||
}
|
|
||||||
if (ctx.chat) {
|
|
||||||
if (
|
|
||||||
ctx.chat.type === "group" ||
|
|
||||||
ctx.chat.type === "supergroup" ||
|
|
||||||
ctx.chat.type === "channel"
|
|
||||||
) {
|
|
||||||
msg.push("in");
|
|
||||||
msg.push(ctx.chat.title);
|
|
||||||
if (
|
|
||||||
(ctx.chat.type === "supergroup" || ctx.chat.type === "channel") &&
|
|
||||||
ctx.chat.username
|
|
||||||
) {
|
|
||||||
msg.push(`(@${ctx.chat.username})`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return msg.join(" ");
|
|
||||||
}
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
import * as SdApi from "../common/sdApi.ts";
|
||||||
|
import { db } from "./db.ts";
|
||||||
|
|
||||||
|
export interface ConfigData {
|
||||||
|
adminUsernames: string[];
|
||||||
|
pausedReason: string | null;
|
||||||
|
maxUserJobs: number;
|
||||||
|
maxJobs: number;
|
||||||
|
defaultParams?: Partial<
|
||||||
|
| SdApi.components["schemas"]["StableDiffusionProcessingTxt2Img"]
|
||||||
|
| SdApi.components["schemas"]["StableDiffusionProcessingImg2Img"]
|
||||||
|
>;
|
||||||
|
sdInstances: SdInstanceData[];
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SdInstanceData {
|
||||||
|
id: string;
|
||||||
|
name?: string;
|
||||||
|
api: { url: string; auth?: string };
|
||||||
|
maxResolution: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
const getDefaultConfig = (): ConfigData => ({
|
||||||
|
adminUsernames: Deno.env.get("TG_ADMIN_USERS")?.split(",") ?? [],
|
||||||
|
pausedReason: null,
|
||||||
|
maxUserJobs: 3,
|
||||||
|
maxJobs: 20,
|
||||||
|
defaultParams: {
|
||||||
|
batch_size: 1,
|
||||||
|
n_iter: 1,
|
||||||
|
width: 512,
|
||||||
|
height: 768,
|
||||||
|
steps: 30,
|
||||||
|
cfg_scale: 10,
|
||||||
|
negative_prompt: "boring_e621_fluffyrock_v4 boring_e621_v4",
|
||||||
|
},
|
||||||
|
sdInstances: [
|
||||||
|
{
|
||||||
|
id: "local",
|
||||||
|
api: { url: Deno.env.get("SD_API_URL") ?? "http://127.0.0.1:7860/" },
|
||||||
|
maxResolution: 1024 * 1024,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
});
|
||||||
|
|
||||||
|
export async function getConfig(): Promise<ConfigData> {
|
||||||
|
const configEntry = await db.get<ConfigData>(["config"]);
|
||||||
|
return configEntry.value ?? getDefaultConfig();
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function setConfig(config: ConfigData): Promise<void> {
|
||||||
|
await db.set(["config"], config);
|
||||||
|
}
|
|
@ -1,9 +1,8 @@
|
||||||
import { GrammyTypes, IKV } from "../deps.ts";
|
import { GrammyTypes, IKV } from "../deps.ts";
|
||||||
import { SdTxt2ImgInfo } from "../common/sdApi.ts";
|
|
||||||
import { PngInfo } from "../common/parsePngInfo.ts";
|
import { PngInfo } from "../common/parsePngInfo.ts";
|
||||||
import { db } from "./db.ts";
|
import { db } from "./db.ts";
|
||||||
|
|
||||||
export interface JobSchema {
|
export interface GenerationSchema {
|
||||||
task:
|
task:
|
||||||
| {
|
| {
|
||||||
type: "txt2img";
|
type: "txt2img";
|
||||||
|
@ -12,38 +11,50 @@ export interface JobSchema {
|
||||||
| {
|
| {
|
||||||
type: "img2img";
|
type: "img2img";
|
||||||
params: Partial<PngInfo>;
|
params: Partial<PngInfo>;
|
||||||
fileId: string;
|
fileId?: string;
|
||||||
};
|
};
|
||||||
from: GrammyTypes.User;
|
from: GrammyTypes.User;
|
||||||
chat: GrammyTypes.Chat;
|
chat: GrammyTypes.Chat;
|
||||||
requestMessageId: number;
|
requestMessageId?: number;
|
||||||
status:
|
status: {
|
||||||
| {
|
info?: SdGenerationInfo;
|
||||||
type: "waiting";
|
startDate?: Date;
|
||||||
message?: GrammyTypes.Message.TextMessage;
|
endDate?: Date;
|
||||||
lastErrorDate?: Date;
|
};
|
||||||
}
|
|
||||||
| {
|
|
||||||
type: "processing";
|
|
||||||
progress: number;
|
|
||||||
worker: string;
|
|
||||||
updatedDate: Date;
|
|
||||||
message?: GrammyTypes.Message.TextMessage;
|
|
||||||
}
|
|
||||||
| {
|
|
||||||
type: "done";
|
|
||||||
info?: SdTxt2ImgInfo;
|
|
||||||
startDate?: Date;
|
|
||||||
endDate?: Date;
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type JobIndices = {
|
/**
|
||||||
"status.type": JobSchema["status"]["type"];
|
* `info` field in generation response is a serialized json string of this shape.
|
||||||
};
|
*/
|
||||||
|
export interface SdGenerationInfo {
|
||||||
|
prompt: string;
|
||||||
|
all_prompts: string[];
|
||||||
|
negative_prompt: string;
|
||||||
|
all_negative_prompts: string[];
|
||||||
|
seed: number;
|
||||||
|
all_seeds: number[];
|
||||||
|
subseed: number;
|
||||||
|
all_subseeds: number[];
|
||||||
|
subseed_strength: number;
|
||||||
|
width: number;
|
||||||
|
height: number;
|
||||||
|
sampler_name: string;
|
||||||
|
cfg_scale: number;
|
||||||
|
steps: number;
|
||||||
|
batch_size: number;
|
||||||
|
restore_faces: boolean;
|
||||||
|
face_restoration_model: unknown;
|
||||||
|
sd_model_hash: string;
|
||||||
|
seed_resize_from_w: number;
|
||||||
|
seed_resize_from_h: number;
|
||||||
|
denoising_strength: number;
|
||||||
|
extra_generation_params: Record<string, string>;
|
||||||
|
index_of_first_image: number;
|
||||||
|
infotexts: string[];
|
||||||
|
styles: unknown[];
|
||||||
|
job_timestamp: string;
|
||||||
|
clip_skip: number;
|
||||||
|
is_using_inpainting_conditioning: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
export const jobStore = new IKV.Store<JobSchema, JobIndices>(db, "job", {
|
export const generationStore = new IKV.Store<GenerationSchema, {}>(db, "job", { indices: {} });
|
||||||
indices: {
|
|
||||||
"status.type": { getValue: (job) => job.status.type },
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
16
deps.ts
16
deps.ts
|
@ -1,18 +1,20 @@
|
||||||
export * as Log from "https://deno.land/std@0.201.0/log/mod.ts";
|
export * as Log from "https://deno.land/std@0.201.0/log/mod.ts";
|
||||||
export * as Async from "https://deno.land/std@0.201.0/async/mod.ts";
|
export * as Async from "https://deno.land/std@0.201.0/async/mod.ts";
|
||||||
export * as FmtDuration from "https://deno.land/std@0.201.0/fmt/duration.ts";
|
export * as FmtDuration from "https://deno.land/std@0.202.0/fmt/duration.ts";
|
||||||
export * as Collections from "https://deno.land/std@0.201.0/collections/mod.ts";
|
export * as Collections from "https://deno.land/std@0.202.0/collections/mod.ts";
|
||||||
export * as Base64 from "https://deno.land/std@0.201.0/encoding/base64.ts";
|
export * as Base64 from "https://deno.land/std@0.202.0/encoding/base64.ts";
|
||||||
export * as AsyncX from "https://deno.land/x/async@v2.0.2/mod.ts";
|
export * as AsyncX from "https://deno.land/x/async@v2.0.2/mod.ts";
|
||||||
export * as ULID from "https://deno.land/x/ulid@v0.3.0/mod.ts";
|
export * as ULID from "https://deno.land/x/ulid@v0.3.0/mod.ts";
|
||||||
export * as IKV from "https://deno.land/x/indexed_kv@v0.3.0/mod.ts";
|
export * as IKV from "https://deno.land/x/indexed_kv@v0.3.0/mod.ts";
|
||||||
export * as Grammy from "https://deno.land/x/grammy@v1.18.1/mod.ts";
|
export * as KVMQ from "https://deno.land/x/kvmq@v0.1.0/mod.ts";
|
||||||
export * as GrammyTypes from "https://deno.land/x/grammy_types@v3.2.0/mod.ts";
|
export * as Grammy from "https://deno.land/x/grammy@v1.18.3/mod.ts";
|
||||||
|
export * as GrammyTypes from "https://deno.land/x/grammy_types@v3.2.2/mod.ts";
|
||||||
export * as GrammyAutoQuote from "https://deno.land/x/grammy_autoquote@v1.1.2/mod.ts";
|
export * as GrammyAutoQuote from "https://deno.land/x/grammy_autoquote@v1.1.2/mod.ts";
|
||||||
export * as GrammyParseMode from "https://deno.land/x/grammy_parse_mode@1.7.1/mod.ts";
|
export * as GrammyParseMode from "https://deno.land/x/grammy_parse_mode@1.8.1/mod.ts";
|
||||||
export * as GrammyKvStorage from "https://deno.land/x/grammy_storages@v2.3.1/denokv/src/mod.ts";
|
export * as GrammyKvStorage from "https://deno.land/x/grammy_storages@v2.3.1/denokv/src/mod.ts";
|
||||||
export * as GrammyStatelessQ from "https://deno.land/x/grammy_stateless_question_alpha@v3.0.3/mod.ts";
|
export * as GrammyStatelessQ from "https://deno.land/x/grammy_stateless_question_alpha@v3.0.4/mod.ts";
|
||||||
export * as GrammyFiles from "https://deno.land/x/grammy_files@v1.0.4/mod.ts";
|
export * as GrammyFiles from "https://deno.land/x/grammy_files@v1.0.4/mod.ts";
|
||||||
export * as FileType from "https://esm.sh/file-type@18.5.0";
|
export * as FileType from "https://esm.sh/file-type@18.5.0";
|
||||||
export { default as pngChunksExtract } from "https://esm.sh/png-chunks-extract@1.0.0";
|
export { default as pngChunksExtract } from "https://esm.sh/png-chunks-extract@1.0.0";
|
||||||
export { decode as pngChunkTextDecode } from "https://esm.sh/png-chunk-text@1.0.0";
|
export { decode as pngChunkTextDecode } from "https://esm.sh/png-chunk-text@1.0.0";
|
||||||
|
export { default as createOpenApiClient } from "https://esm.sh/openapi-fetch@0.7.6";
|
||||||
|
|
|
@ -0,0 +1,409 @@
|
||||||
|
import { bot } from "../bot/mod.ts";
|
||||||
|
import { PngInfo } from "../common/parsePngInfo.ts";
|
||||||
|
import * as SdApi from "../common/sdApi.ts";
|
||||||
|
import { formatUserChat } from "../common/formatUserChat.ts";
|
||||||
|
import { getConfig, SdInstanceData } from "../db/config.ts";
|
||||||
|
import { db } from "../db/db.ts";
|
||||||
|
import { generationStore, SdGenerationInfo } from "../db/jobStore.ts";
|
||||||
|
import {
|
||||||
|
Async,
|
||||||
|
AsyncX,
|
||||||
|
Base64,
|
||||||
|
createOpenApiClient,
|
||||||
|
FileType,
|
||||||
|
FmtDuration,
|
||||||
|
Grammy,
|
||||||
|
GrammyParseMode,
|
||||||
|
GrammyTypes,
|
||||||
|
KVMQ,
|
||||||
|
Log,
|
||||||
|
} from "../deps.ts";
|
||||||
|
import { formatOrdinal } from "../common/formatOrdinal.ts";
|
||||||
|
import { deadline } from "../common/deadline.ts";
|
||||||
|
import { SdError } from "../common/SdError.ts";
|
||||||
|
|
||||||
|
const logger = () => Log.getLogger();
|
||||||
|
|
||||||
|
interface GenerationJob {
|
||||||
|
task:
|
||||||
|
| {
|
||||||
|
type: "txt2img";
|
||||||
|
params: Partial<PngInfo>;
|
||||||
|
}
|
||||||
|
| {
|
||||||
|
type: "img2img";
|
||||||
|
params: Partial<PngInfo>;
|
||||||
|
fileId: string;
|
||||||
|
};
|
||||||
|
from: GrammyTypes.User;
|
||||||
|
chat: GrammyTypes.Chat;
|
||||||
|
requestMessage: GrammyTypes.Message;
|
||||||
|
replyMessage?: GrammyTypes.Message;
|
||||||
|
sdInstanceId?: string;
|
||||||
|
progress?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const generationQueue = new KVMQ.Queue<GenerationJob>(db, "jobQueue");
|
||||||
|
|
||||||
|
export const activeGenerationWorkers = new Map<string, KVMQ.Worker<GenerationJob>>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Periodically restarts stable diffusion generation workers if they become online.
|
||||||
|
*/
|
||||||
|
export async function restartGenerationWorkers() {
|
||||||
|
while (true) {
|
||||||
|
const config = await getConfig();
|
||||||
|
|
||||||
|
for (const sdInstance of config.sdInstances) {
|
||||||
|
const activeWorker = activeGenerationWorkers.get(sdInstance.id);
|
||||||
|
if (activeWorker?.isProcessing) continue;
|
||||||
|
|
||||||
|
const activeWorkerSdClient = createOpenApiClient<SdApi.paths>({
|
||||||
|
baseUrl: sdInstance.api.url,
|
||||||
|
headers: { "Authorization": sdInstance.api.auth },
|
||||||
|
});
|
||||||
|
|
||||||
|
// check if worker is up
|
||||||
|
|
||||||
|
const activeWorkerStatus = await activeWorkerSdClient.GET("/sdapi/v1/memory", {
|
||||||
|
signal: deadline(10_000),
|
||||||
|
})
|
||||||
|
.then((response) => {
|
||||||
|
if (!response.data) {
|
||||||
|
throw new SdError("Failed to get worker status", response.response, response.error);
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
})
|
||||||
|
.catch((error) => {
|
||||||
|
logger().warning(`Worker ${sdInstance.id} is down: ${error}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!activeWorkerStatus?.data) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const newWorker = generationQueue.createWorker(({ state, setState }) =>
|
||||||
|
processGenerationJob(state, setState, sdInstance)
|
||||||
|
);
|
||||||
|
|
||||||
|
logger().info(`Started worker ${sdInstance.id}`);
|
||||||
|
|
||||||
|
newWorker.processJobs();
|
||||||
|
|
||||||
|
newWorker.addEventListener("error", (e) => {
|
||||||
|
logger().error(`Job failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`);
|
||||||
|
bot.api.sendMessage(
|
||||||
|
e.detail.job.state.requestMessage.chat.id,
|
||||||
|
`Generating failed: ${e.detail.error}`,
|
||||||
|
{
|
||||||
|
reply_to_message_id: e.detail.job.state.requestMessage.message_id,
|
||||||
|
},
|
||||||
|
).catch(() => undefined);
|
||||||
|
// TODO: only stop worker if error is network error
|
||||||
|
newWorker.stopProcessing();
|
||||||
|
});
|
||||||
|
|
||||||
|
activeGenerationWorkers.set(sdInstance.id, newWorker);
|
||||||
|
}
|
||||||
|
await Async.delay(60_000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function processGenerationJob(
|
||||||
|
job: GenerationJob,
|
||||||
|
setJob: (state: GenerationJob) => Promise<void>,
|
||||||
|
sdInstance: SdInstanceData,
|
||||||
|
) {
|
||||||
|
logger().debug(`Job started for ${formatUserChat(job)} using ${sdInstance.id}`);
|
||||||
|
const startDate = new Date();
|
||||||
|
job.sdInstanceId = sdInstance.id;
|
||||||
|
await setJob(job);
|
||||||
|
|
||||||
|
const config = await getConfig();
|
||||||
|
const workerSdClient = createOpenApiClient<SdApi.paths>({
|
||||||
|
baseUrl: sdInstance.api.url,
|
||||||
|
headers: { "Authorization": sdInstance.api.auth },
|
||||||
|
});
|
||||||
|
|
||||||
|
// if there is already a status message and its older than 30 seconds
|
||||||
|
if (job.replyMessage && (Date.now() - job.replyMessage.date * 1000) > 30_000) {
|
||||||
|
// try to delete it
|
||||||
|
await bot.api.deleteMessage(job.replyMessage.chat.id, job.replyMessage.message_id)
|
||||||
|
.catch(() => undefined);
|
||||||
|
job.replyMessage = undefined;
|
||||||
|
await setJob(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 })
|
||||||
|
.catch(() => undefined);
|
||||||
|
|
||||||
|
// if now there is no status message
|
||||||
|
if (!job.replyMessage) {
|
||||||
|
// send a new status message
|
||||||
|
job.replyMessage = await bot.api.sendMessage(
|
||||||
|
job.chat.id,
|
||||||
|
`Generating your prompt now... 0% using ${sdInstance.name}`,
|
||||||
|
{ reply_to_message_id: job.requestMessage.message_id },
|
||||||
|
).catch((err) => {
|
||||||
|
// if the request message (the message we are replying to) was deleted
|
||||||
|
if (err instanceof Grammy.GrammyError && err.message.match(/repl(y|ied)/)) {
|
||||||
|
// set the status message to undefined
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
});
|
||||||
|
await setJob(job);
|
||||||
|
} else {
|
||||||
|
// edit the existing status message
|
||||||
|
await bot.api.editMessageText(
|
||||||
|
job.replyMessage.chat.id,
|
||||||
|
job.replyMessage.message_id,
|
||||||
|
`Generating your prompt now... 0% using ${sdInstance.name}`,
|
||||||
|
{ maxAttempts: 1 },
|
||||||
|
).catch(() => undefined);
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we don't have a status message (it failed sending because request was deleted)
|
||||||
|
if (!job.replyMessage) {
|
||||||
|
// cancel the job
|
||||||
|
logger().info(`Job cancelled for ${formatUserChat(job)}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// reduce size if worker can't handle the resolution
|
||||||
|
const size = limitSize(
|
||||||
|
{ ...config.defaultParams, ...job.task.params },
|
||||||
|
sdInstance.maxResolution,
|
||||||
|
);
|
||||||
|
function limitSize(
|
||||||
|
{ width, height }: { width?: number; height?: number },
|
||||||
|
maxResolution: number,
|
||||||
|
): { width?: number; height?: number } {
|
||||||
|
if (!width || !height) return {};
|
||||||
|
const ratio = width / height;
|
||||||
|
if (width * height > maxResolution) {
|
||||||
|
return {
|
||||||
|
width: Math.trunc(Math.sqrt(maxResolution * ratio)),
|
||||||
|
height: Math.trunc(Math.sqrt(maxResolution / ratio)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return { width, height };
|
||||||
|
}
|
||||||
|
|
||||||
|
// start generating the image
|
||||||
|
const responsePromise = job.task.type === "txt2img"
|
||||||
|
? workerSdClient.POST("/sdapi/v1/txt2img", {
|
||||||
|
body: {
|
||||||
|
...config.defaultParams,
|
||||||
|
...job.task.params,
|
||||||
|
...size,
|
||||||
|
negative_prompt: job.task.params.negative_prompt
|
||||||
|
? job.task.params.negative_prompt
|
||||||
|
: config.defaultParams?.negative_prompt,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
: job.task.type === "img2img"
|
||||||
|
? workerSdClient.POST("/sdapi/v1/img2img", {
|
||||||
|
body: {
|
||||||
|
...config.defaultParams,
|
||||||
|
...job.task.params,
|
||||||
|
...size,
|
||||||
|
negative_prompt: job.task.params.negative_prompt
|
||||||
|
? job.task.params.negative_prompt
|
||||||
|
: config.defaultParams?.negative_prompt,
|
||||||
|
init_images: [
|
||||||
|
Base64.encode(
|
||||||
|
await fetch(
|
||||||
|
`https://api.telegram.org/file/bot${bot.token}/${await bot.api.getFile(
|
||||||
|
job.task.fileId,
|
||||||
|
).then((file) => file.file_path)}`,
|
||||||
|
).then((resp) => resp.arrayBuffer()),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
},
|
||||||
|
})
|
||||||
|
: undefined;
|
||||||
|
|
||||||
|
if (!responsePromise) {
|
||||||
|
throw new Error(`Unknown task type: ${job.task.type}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// poll for progress while the generation request is pending
|
||||||
|
while (await AsyncX.promiseState(responsePromise) === "pending") {
|
||||||
|
await Async.delay(3000);
|
||||||
|
const progressResponse = await workerSdClient.GET("/sdapi/v1/progress", {
|
||||||
|
params: {},
|
||||||
|
signal: deadline(15_000),
|
||||||
|
});
|
||||||
|
if (!progressResponse.data) {
|
||||||
|
throw new SdError(
|
||||||
|
"Failed to get progress",
|
||||||
|
progressResponse.response,
|
||||||
|
progressResponse.error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
job.progress = progressResponse.data.progress;
|
||||||
|
await setJob(job);
|
||||||
|
await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 })
|
||||||
|
.catch(() => undefined);
|
||||||
|
if (job.replyMessage) {
|
||||||
|
await bot.api.editMessageText(
|
||||||
|
job.replyMessage.chat.id,
|
||||||
|
job.replyMessage.message_id,
|
||||||
|
`Generating your prompt now... ${
|
||||||
|
(progressResponse.data.progress * 100).toFixed(0)
|
||||||
|
}% using ${sdInstance.name}`,
|
||||||
|
{ maxAttempts: 1 },
|
||||||
|
).catch(() => undefined);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const response = await responsePromise;
|
||||||
|
|
||||||
|
if (!response.data) {
|
||||||
|
throw new SdError("Generating image failed", response.response, response.error);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!response.data.images?.length) {
|
||||||
|
throw new Error("No images returned from SD");
|
||||||
|
}
|
||||||
|
|
||||||
|
// info field is a json serialized string so we need to parse it
|
||||||
|
const info: SdGenerationInfo = JSON.parse(response.data.info);
|
||||||
|
|
||||||
|
// change status message to uploading images
|
||||||
|
await bot.api.editMessageText(
|
||||||
|
job.replyMessage.chat.id,
|
||||||
|
job.replyMessage.message_id,
|
||||||
|
`Uploading your images...`,
|
||||||
|
{ maxAttempts: 1 },
|
||||||
|
).catch(() => undefined);
|
||||||
|
|
||||||
|
// render the caption
|
||||||
|
// const detailedReply = Object.keys(job.value.params).filter((key) => key !== "prompt").length > 0;
|
||||||
|
const detailedReply = true;
|
||||||
|
const jobDurationMs = Math.trunc((Date.now() - startDate.getTime()) / 1000) * 1000;
|
||||||
|
const { bold, fmt } = GrammyParseMode;
|
||||||
|
const caption = fmt([
|
||||||
|
`${info.prompt}\n`,
|
||||||
|
...detailedReply
|
||||||
|
? [
|
||||||
|
info.negative_prompt ? fmt`${bold("Negative prompt:")} ${info.negative_prompt}\n` : "",
|
||||||
|
fmt`${bold("Steps:")} ${info.steps}, `,
|
||||||
|
fmt`${bold("Sampler:")} ${info.sampler_name}, `,
|
||||||
|
fmt`${bold("CFG scale:")} ${info.cfg_scale}, `,
|
||||||
|
fmt`${bold("Seed:")} ${info.seed}, `,
|
||||||
|
fmt`${bold("Size")}: ${info.width}x${info.height}, `,
|
||||||
|
fmt`${bold("Worker")}: ${sdInstance.id}, `,
|
||||||
|
fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`,
|
||||||
|
]
|
||||||
|
: [],
|
||||||
|
]);
|
||||||
|
|
||||||
|
// sending images loop because telegram is unreliable and it would be a shame to lose the images
|
||||||
|
// TODO: separate queue for sending images
|
||||||
|
let sendMediaAttempt = 0;
|
||||||
|
let resultMessages: GrammyTypes.Message.MediaMessage[] | undefined;
|
||||||
|
while (true) {
|
||||||
|
sendMediaAttempt++;
|
||||||
|
await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 })
|
||||||
|
.catch(() => undefined);
|
||||||
|
|
||||||
|
// parse files from reply JSON
|
||||||
|
const inputFiles = await Promise.all(
|
||||||
|
response.data.images.map(async (imageBase64, idx) => {
|
||||||
|
const imageBuffer = Base64.decode(imageBase64);
|
||||||
|
const imageType = await FileType.fileTypeFromBuffer(imageBuffer);
|
||||||
|
if (!imageType) throw new Error("Unknown file type returned from worker");
|
||||||
|
return Grammy.InputMediaBuilder.photo(
|
||||||
|
new Grammy.InputFile(imageBuffer, `image${idx}.${imageType.ext}`),
|
||||||
|
// if it can fit, add caption for first photo
|
||||||
|
idx === 0 && caption.text.length <= 1024
|
||||||
|
? { caption: caption.text, caption_entities: caption.entities }
|
||||||
|
: undefined,
|
||||||
|
);
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
// send the result to telegram
|
||||||
|
try {
|
||||||
|
resultMessages = await bot.api.sendMediaGroup(job.chat.id, inputFiles, {
|
||||||
|
reply_to_message_id: job.requestMessage.message_id,
|
||||||
|
maxAttempts: 5,
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
} catch (err) {
|
||||||
|
logger().warning(
|
||||||
|
`Sending images (attempt ${sendMediaAttempt}) for ${
|
||||||
|
formatUserChat(job)
|
||||||
|
} using ${sdInstance.id} failed: ${err}`,
|
||||||
|
);
|
||||||
|
if (sendMediaAttempt >= 6) throw err;
|
||||||
|
// wait 2 * 5 seconds before retrying
|
||||||
|
for (let i = 0; i < 2; i++) {
|
||||||
|
await bot.api.sendChatAction(job.chat.id, "upload_photo", { maxAttempts: 1 })
|
||||||
|
.catch(() => undefined);
|
||||||
|
await Async.delay(5000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// send caption in separate message if it couldn't fit
|
||||||
|
if (caption.text.length > 1024 && caption.text.length <= 4096) {
|
||||||
|
await bot.api.sendMessage(job.chat.id, caption.text, {
|
||||||
|
reply_to_message_id: resultMessages[0].message_id,
|
||||||
|
entities: caption.entities,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete the status message
|
||||||
|
await bot.api.deleteMessage(job.replyMessage.chat.id, job.replyMessage.message_id)
|
||||||
|
.catch(() => undefined);
|
||||||
|
job.replyMessage = undefined;
|
||||||
|
await setJob(job);
|
||||||
|
|
||||||
|
// save to generation storage
|
||||||
|
generationStore.create({
|
||||||
|
task: { type: job.task.type, params: job.task.params },
|
||||||
|
from: job.from,
|
||||||
|
chat: job.chat,
|
||||||
|
status: {
|
||||||
|
startDate,
|
||||||
|
endDate: new Date(),
|
||||||
|
info: info,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
logger().debug(
|
||||||
|
`Job finished for ${formatUserChat(job)} using ${sdInstance.id}${
|
||||||
|
sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : ""
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the status message of all jobs in the queue.
|
||||||
|
*/
|
||||||
|
export async function handleGenerationUpdates() {
|
||||||
|
while (true) {
|
||||||
|
const jobs = await generationQueue.getAllJobs();
|
||||||
|
let index = 0;
|
||||||
|
for (const job of jobs) {
|
||||||
|
if (job.lockUntil > new Date()) {
|
||||||
|
// job is currently being processed, the worker will update its status message
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!job.state.replyMessage) {
|
||||||
|
// no status message, nothing to update
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
index++;
|
||||||
|
await bot.api.editMessageText(
|
||||||
|
job.state.replyMessage.chat.id,
|
||||||
|
job.state.replyMessage.message_id,
|
||||||
|
`You are ${formatOrdinal(index)} in queue.`,
|
||||||
|
{ maxAttempts: 1 },
|
||||||
|
).catch(() => undefined);
|
||||||
|
}
|
||||||
|
await Async.delay(3000);
|
||||||
|
}
|
||||||
|
}
|
11
tasks/mod.ts
11
tasks/mod.ts
|
@ -1,13 +1,8 @@
|
||||||
import { pingWorkers } from "./pingWorkers.ts";
|
import { handleGenerationUpdates, restartGenerationWorkers } from "./generationQueue.ts";
|
||||||
import { processJobs } from "./processJobs.ts";
|
|
||||||
import { returnHangedJobs } from "./returnHangedJobs.ts";
|
|
||||||
import { updateJobStatusMsgs } from "./updateJobStatusMsgs.ts";
|
|
||||||
|
|
||||||
export async function runAllTasks() {
|
export async function runAllTasks() {
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
processJobs(),
|
restartGenerationWorkers(),
|
||||||
updateJobStatusMsgs(),
|
handleGenerationUpdates(),
|
||||||
returnHangedJobs(),
|
|
||||||
pingWorkers(),
|
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,32 +0,0 @@
|
||||||
import { Async, Log } from "../deps.ts";
|
|
||||||
import { getGlobalSession } from "../bot/session.ts";
|
|
||||||
import { sdGetConfig } from "../common/sdApi.ts";
|
|
||||||
|
|
||||||
const logger = () => Log.getLogger();
|
|
||||||
|
|
||||||
export const runningWorkers = new Set<string>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Periodically ping the workers to see if they are alive.
|
|
||||||
*/
|
|
||||||
export async function pingWorkers(): Promise<never> {
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
const config = await getGlobalSession();
|
|
||||||
for (const worker of config.workers) {
|
|
||||||
const status = await sdGetConfig(worker.api).catch(() => null);
|
|
||||||
const wasRunning = runningWorkers.has(worker.id);
|
|
||||||
if (status) {
|
|
||||||
runningWorkers.add(worker.id);
|
|
||||||
if (!wasRunning) logger().info(`Worker ${worker.id} is online`);
|
|
||||||
} else {
|
|
||||||
runningWorkers.delete(worker.id);
|
|
||||||
if (wasRunning) logger().warning(`Worker ${worker.id} went offline`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
await Async.delay(60 * 1000);
|
|
||||||
} catch (err) {
|
|
||||||
logger().warning(`Pinging workers failed: ${err}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,369 +0,0 @@
|
||||||
import {
|
|
||||||
Async,
|
|
||||||
Base64,
|
|
||||||
FileType,
|
|
||||||
FmtDuration,
|
|
||||||
Grammy,
|
|
||||||
GrammyParseMode,
|
|
||||||
GrammyTypes,
|
|
||||||
IKV,
|
|
||||||
Log,
|
|
||||||
} from "../deps.ts";
|
|
||||||
import { bot } from "../bot/mod.ts";
|
|
||||||
import { getGlobalSession, GlobalData, WorkerData } from "../bot/session.ts";
|
|
||||||
import { fmt, formatUserChat } from "../common/utils.ts";
|
|
||||||
import {
|
|
||||||
SdApiError,
|
|
||||||
sdImg2Img,
|
|
||||||
SdProgressResponse,
|
|
||||||
SdResponse,
|
|
||||||
sdTxt2Img,
|
|
||||||
} from "../common/sdApi.ts";
|
|
||||||
import { JobSchema, jobStore } from "../db/jobStore.ts";
|
|
||||||
import { runningWorkers } from "./pingWorkers.ts";
|
|
||||||
|
|
||||||
const logger = () => Log.getLogger();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sends waiting jobs to workers.
|
|
||||||
*/
|
|
||||||
export async function processJobs(): Promise<never> {
|
|
||||||
const busyWorkers = new Set<string>();
|
|
||||||
while (true) {
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 1000));
|
|
||||||
|
|
||||||
try {
|
|
||||||
const jobs = await jobStore.getBy("status.type", { value: "waiting" });
|
|
||||||
// get first waiting job which hasn't errored in last minute
|
|
||||||
const job = jobs.find((job) =>
|
|
||||||
job.value.status.type === "waiting" &&
|
|
||||||
(job.value.status.lastErrorDate?.getTime() ?? 0) < Date.now() - 60_000
|
|
||||||
);
|
|
||||||
if (!job) continue;
|
|
||||||
|
|
||||||
// find a worker to handle the job
|
|
||||||
const config = await getGlobalSession();
|
|
||||||
const worker = config.workers?.find((worker) =>
|
|
||||||
runningWorkers.has(worker.id) &&
|
|
||||||
!busyWorkers.has(worker.id)
|
|
||||||
);
|
|
||||||
if (!worker) continue;
|
|
||||||
|
|
||||||
// process the job
|
|
||||||
await job.update((value) => ({
|
|
||||||
...value,
|
|
||||||
status: {
|
|
||||||
type: "processing",
|
|
||||||
progress: 0,
|
|
||||||
worker: worker.id,
|
|
||||||
updatedDate: new Date(),
|
|
||||||
message: job.value.status.type !== "done" ? job.value.status.message : undefined,
|
|
||||||
},
|
|
||||||
}));
|
|
||||||
busyWorkers.add(worker.id);
|
|
||||||
processJob(job, worker, config)
|
|
||||||
.catch(async (err) => {
|
|
||||||
logger().error(
|
|
||||||
`Job failed for ${formatUserChat(job.value)} via ${worker.id}: ${err}`,
|
|
||||||
);
|
|
||||||
if (job.value.status.type === "processing" && job.value.status.message) {
|
|
||||||
await bot.api.deleteMessage(
|
|
||||||
job.value.status.message.chat.id,
|
|
||||||
job.value.status.message.message_id,
|
|
||||||
).catch(() => undefined);
|
|
||||||
}
|
|
||||||
if (err instanceof Grammy.GrammyError || err instanceof SdApiError) {
|
|
||||||
await bot.api.sendMessage(
|
|
||||||
job.value.chat.id,
|
|
||||||
`Failed to generate your prompt using ${worker.name}: ${err.message}`,
|
|
||||||
{ reply_to_message_id: job.value.requestMessageId },
|
|
||||||
).catch(() => undefined);
|
|
||||||
await job.update({ status: { type: "waiting", lastErrorDate: new Date() } })
|
|
||||||
.catch(() => undefined);
|
|
||||||
}
|
|
||||||
if (
|
|
||||||
err instanceof SdApiError &&
|
|
||||||
(
|
|
||||||
err.statusCode === 0 /* Network error */ ||
|
|
||||||
err.statusCode === 404 ||
|
|
||||||
err.statusCode === 401
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
runningWorkers.delete(worker.id);
|
|
||||||
logger().warning(
|
|
||||||
`Worker ${worker.id} was marked as offline because of network error`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
await job.delete().catch(() => undefined);
|
|
||||||
if (!(err instanceof Grammy.GrammyError) || err.error_code !== 403 /* blocked bot */) {
|
|
||||||
await jobStore.create(job.value);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.finally(() => busyWorkers.delete(worker.id));
|
|
||||||
} catch (err) {
|
|
||||||
logger().warning(`Processing jobs failed: ${err}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config: GlobalData) {
|
|
||||||
logger().debug(
|
|
||||||
`Job started for ${formatUserChat(job.value)} using ${worker.id}`,
|
|
||||||
);
|
|
||||||
const startDate = new Date();
|
|
||||||
|
|
||||||
// if there is already a status message and its older than 10 seconds
|
|
||||||
if (
|
|
||||||
job.value.status.type === "processing" && job.value.status.message &&
|
|
||||||
(Date.now() - job.value.status.message.date * 1000) > 10 * 1000
|
|
||||||
) {
|
|
||||||
// delete it
|
|
||||||
await bot.api.deleteMessage(
|
|
||||||
job.value.status.message.chat.id,
|
|
||||||
job.value.status.message.message_id,
|
|
||||||
).catch(() => undefined);
|
|
||||||
await job.update((value) => ({
|
|
||||||
...value,
|
|
||||||
status: { ...value.status, message: undefined },
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
// we have to check if job is still processing at every step because TypeScript
|
|
||||||
if (job.value.status.type === "processing") {
|
|
||||||
await bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 })
|
|
||||||
.catch(() => undefined);
|
|
||||||
// if now there is no status message
|
|
||||||
if (!job.value.status.message) {
|
|
||||||
// send a new status message
|
|
||||||
const statusMessage = await bot.api.sendMessage(
|
|
||||||
job.value.chat.id,
|
|
||||||
`Generating your prompt now... 0% using ${worker.name}`,
|
|
||||||
{ reply_to_message_id: job.value.requestMessageId },
|
|
||||||
).catch((err) => {
|
|
||||||
// if the request message (the message we are replying to) was deleted
|
|
||||||
if (err instanceof Grammy.GrammyError && err.message.match(/repl(y|ied)/)) {
|
|
||||||
// jest set the status message to undefined
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
throw err;
|
|
||||||
});
|
|
||||||
await job.update((value) => ({
|
|
||||||
...value,
|
|
||||||
status: { ...value.status, message: statusMessage },
|
|
||||||
}));
|
|
||||||
} else {
|
|
||||||
// edit the existing status message
|
|
||||||
await bot.api.editMessageText(
|
|
||||||
job.value.status.message.chat.id,
|
|
||||||
job.value.status.message.message_id,
|
|
||||||
`Generating your prompt now... 0% using ${worker.name}`,
|
|
||||||
{ maxAttempts: 1 },
|
|
||||||
).catch(() => undefined);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we don't have a status message (it failed sending because request was deleted)
|
|
||||||
if (job.value.status.type === "processing" && !job.value.status.message) {
|
|
||||||
// cancel the job
|
|
||||||
await job.delete();
|
|
||||||
logger().info(`Job cancelled for ${formatUserChat(job.value)}`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// reduce size if worker can't handle the resolution
|
|
||||||
const size = limitSize(
|
|
||||||
{ ...config.defaultParams, ...job.value.task.params },
|
|
||||||
worker.maxResolution,
|
|
||||||
);
|
|
||||||
|
|
||||||
// process the job
|
|
||||||
const handleProgress = async (progress: SdProgressResponse) => {
|
|
||||||
// Important: don't let any errors escape this function
|
|
||||||
if (job.value.status.type === "processing" && job.value.status.message) {
|
|
||||||
await Promise.all([
|
|
||||||
bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 }),
|
|
||||||
progress.progress > job.value.status.progress && bot.api.editMessageText(
|
|
||||||
job.value.status.message.chat.id,
|
|
||||||
job.value.status.message.message_id,
|
|
||||||
`Generating your prompt now... ${
|
|
||||||
(progress.progress * 100).toFixed(0)
|
|
||||||
}% using ${worker.name}`,
|
|
||||||
{ maxAttempts: 1 },
|
|
||||||
),
|
|
||||||
job.update((value) => ({
|
|
||||||
...value,
|
|
||||||
status: {
|
|
||||||
type: "processing",
|
|
||||||
progress: progress.progress,
|
|
||||||
worker: worker.id,
|
|
||||||
updatedDate: new Date(),
|
|
||||||
message: value.status.type !== "done" ? value.status.message : undefined,
|
|
||||||
},
|
|
||||||
}), { maxAttempts: 1 }),
|
|
||||||
]).catch((err) =>
|
|
||||||
logger().warning(
|
|
||||||
`Updating job status for ${formatUserChat(job.value)} using ${worker.id} failed: ${err}`,
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let response: SdResponse<unknown>;
|
|
||||||
const taskType = job.value.task.type; // don't narrow this to never pls typescript
|
|
||||||
switch (job.value.task.type) {
|
|
||||||
case "txt2img":
|
|
||||||
response = await sdTxt2Img(
|
|
||||||
worker.api,
|
|
||||||
{
|
|
||||||
...config.defaultParams,
|
|
||||||
...job.value.task.params,
|
|
||||||
...size,
|
|
||||||
negative_prompt: job.value.task.params.negative_prompt
|
|
||||||
? job.value.task.params.negative_prompt
|
|
||||||
: config.defaultParams?.negative_prompt,
|
|
||||||
},
|
|
||||||
handleProgress,
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
case "img2img": {
|
|
||||||
const file = await bot.api.getFile(job.value.task.fileId);
|
|
||||||
const fileUrl = `https://api.telegram.org/file/bot${bot.token}/${file.file_path}`;
|
|
||||||
const fileBuffer = await fetch(fileUrl).then((resp) => resp.arrayBuffer());
|
|
||||||
const fileBase64 = Base64.encode(fileBuffer);
|
|
||||||
response = await sdImg2Img(
|
|
||||||
worker.api,
|
|
||||||
{ ...config.defaultParams, ...job.value.task.params, ...size, init_images: [fileBase64] },
|
|
||||||
handleProgress,
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
throw new Error(`Unknown task type: ${taskType}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
// change status message to uploading images
|
|
||||||
if (job.value.status.type === "processing" && job.value.status.message) {
|
|
||||||
await bot.api.editMessageText(
|
|
||||||
job.value.status.message.chat.id,
|
|
||||||
job.value.status.message.message_id,
|
|
||||||
`Uploading your images...`,
|
|
||||||
{ maxAttempts: 1 },
|
|
||||||
).catch(() => undefined);
|
|
||||||
}
|
|
||||||
|
|
||||||
// render the caption
|
|
||||||
// const detailedReply = Object.keys(job.value.params).filter((key) => key !== "prompt").length > 0;
|
|
||||||
const detailedReply = true;
|
|
||||||
const jobDurationMs = Math.trunc((Date.now() - startDate.getTime()) / 1000) * 1000;
|
|
||||||
const { bold } = GrammyParseMode;
|
|
||||||
const caption = fmt([
|
|
||||||
`${response.info.prompt}\n`,
|
|
||||||
...detailedReply
|
|
||||||
? [
|
|
||||||
response.info.negative_prompt
|
|
||||||
? fmt`${bold("Negative prompt:")} ${response.info.negative_prompt}\n`
|
|
||||||
: "",
|
|
||||||
fmt`${bold("Steps:")} ${response.info.steps}, `,
|
|
||||||
fmt`${bold("Sampler:")} ${response.info.sampler_name}, `,
|
|
||||||
fmt`${bold("CFG scale:")} ${response.info.cfg_scale}, `,
|
|
||||||
fmt`${bold("Seed:")} ${response.info.seed}, `,
|
|
||||||
fmt`${bold("Size")}: ${response.info.width}x${response.info.height}, `,
|
|
||||||
fmt`${bold("Worker")}: ${worker.id}, `,
|
|
||||||
fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`,
|
|
||||||
]
|
|
||||||
: [],
|
|
||||||
]);
|
|
||||||
|
|
||||||
// sending images loop because telegram is unreliable and it would be a shame to lose the images
|
|
||||||
let sendMediaAttempt = 0;
|
|
||||||
let resultMessages: GrammyTypes.Message.MediaMessage[] | undefined;
|
|
||||||
while (true) {
|
|
||||||
sendMediaAttempt++;
|
|
||||||
await bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 })
|
|
||||||
.catch(() => undefined);
|
|
||||||
|
|
||||||
// parse files from reply JSON
|
|
||||||
const inputFiles = await Promise.all(
|
|
||||||
response.images.map(async (imageBase64, idx) => {
|
|
||||||
const imageBuffer = Base64.decode(imageBase64);
|
|
||||||
const imageType = await FileType.fileTypeFromBuffer(imageBuffer);
|
|
||||||
if (!imageType) throw new Error("Unknown file type returned from worker");
|
|
||||||
return Grammy.InputMediaBuilder.photo(
|
|
||||||
new Grammy.InputFile(imageBuffer, `image${idx}.${imageType.ext}`),
|
|
||||||
// if it can fit, add caption for first photo
|
|
||||||
idx === 0 && caption.text.length <= 1024
|
|
||||||
? { caption: caption.text, caption_entities: caption.entities }
|
|
||||||
: undefined,
|
|
||||||
);
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
// send the result to telegram
|
|
||||||
try {
|
|
||||||
resultMessages = await bot.api.sendMediaGroup(job.value.chat.id, inputFiles, {
|
|
||||||
reply_to_message_id: job.value.requestMessageId,
|
|
||||||
maxAttempts: 5,
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
} catch (err) {
|
|
||||||
logger().warning(
|
|
||||||
`Sending images (attempt ${sendMediaAttempt}) for ${
|
|
||||||
formatUserChat(job.value)
|
|
||||||
} using ${worker.id} failed: ${err}`,
|
|
||||||
);
|
|
||||||
if (sendMediaAttempt >= 6) throw err;
|
|
||||||
// wait 2 * 5 seconds before retrying
|
|
||||||
for (let i = 0; i < 2; i++) {
|
|
||||||
await bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 })
|
|
||||||
.catch(() => undefined);
|
|
||||||
await Async.delay(5000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// send caption in separate message if it couldn't fit
|
|
||||||
if (caption.text.length > 1024 && caption.text.length <= 4096) {
|
|
||||||
await bot.api.sendMessage(job.value.chat.id, caption.text, {
|
|
||||||
reply_to_message_id: resultMessages[0].message_id,
|
|
||||||
entities: caption.entities,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete the status message
|
|
||||||
if (job.value.status.type === "processing" && job.value.status.message) {
|
|
||||||
await bot.api.deleteMessage(
|
|
||||||
job.value.status.message.chat.id,
|
|
||||||
job.value.status.message.message_id,
|
|
||||||
).catch(() => undefined);
|
|
||||||
await job.update((value) => ({
|
|
||||||
...value,
|
|
||||||
status: { ...value.status, message: undefined },
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
// update job to status done
|
|
||||||
await job.update((value) => ({
|
|
||||||
...value,
|
|
||||||
status: { type: "done", info: response.info, startDate, endDate: new Date() },
|
|
||||||
}));
|
|
||||||
|
|
||||||
logger().debug(
|
|
||||||
`Job finished for ${formatUserChat(job.value)} using ${worker.id}${
|
|
||||||
sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : ""
|
|
||||||
}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
function limitSize(
|
|
||||||
{ width, height }: { width?: number; height?: number },
|
|
||||||
maxResolution: number,
|
|
||||||
): { width?: number; height?: number } {
|
|
||||||
if (!width || !height) return {};
|
|
||||||
const ratio = width / height;
|
|
||||||
if (width * height > maxResolution) {
|
|
||||||
return {
|
|
||||||
width: Math.trunc(Math.sqrt(maxResolution * ratio)),
|
|
||||||
height: Math.trunc(Math.sqrt(maxResolution / ratio)),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
return { width, height };
|
|
||||||
}
|
|
|
@ -1,40 +0,0 @@
|
||||||
import { FmtDuration, Log } from "../deps.ts";
|
|
||||||
import { formatUserChat } from "../common/utils.ts";
|
|
||||||
import { jobStore } from "../db/jobStore.ts";
|
|
||||||
|
|
||||||
const logger = () => Log.getLogger();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns hanged jobs to the queue.
|
|
||||||
*/
|
|
||||||
export async function returnHangedJobs(): Promise<never> {
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 5000));
|
|
||||||
const jobs = await jobStore.getBy("status.type", { value: "processing" });
|
|
||||||
for (const job of jobs) {
|
|
||||||
if (job.value.status.type !== "processing") continue;
|
|
||||||
// if job wasn't updated for 2 minutes, return it to the queue
|
|
||||||
const timeSinceLastUpdateMs = Date.now() - job.value.status.updatedDate.getTime();
|
|
||||||
if (timeSinceLastUpdateMs > 2 * 60 * 1000) {
|
|
||||||
await job.update((value) => ({
|
|
||||||
...value,
|
|
||||||
status: {
|
|
||||||
type: "waiting",
|
|
||||||
message: value.status.type !== "done" ? value.status.message : undefined,
|
|
||||||
},
|
|
||||||
}));
|
|
||||||
logger().warning(
|
|
||||||
`Job for ${formatUserChat(job.value)} was returned to the queue because it hanged for ${
|
|
||||||
FmtDuration.format(Math.trunc(timeSinceLastUpdateMs / 1000) * 1000, {
|
|
||||||
ignoreZero: true,
|
|
||||||
})
|
|
||||||
}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
logger().warning(`Returning hanged jobs failed: ${err}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,29 +0,0 @@
|
||||||
import { Log } from "../deps.ts";
|
|
||||||
import { bot } from "../bot/mod.ts";
|
|
||||||
import { formatOrdinal } from "../common/utils.ts";
|
|
||||||
import { jobStore } from "../db/jobStore.ts";
|
|
||||||
|
|
||||||
const logger = () => Log.getLogger();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Updates status messages for jobs in the queue.
|
|
||||||
*/
|
|
||||||
export async function updateJobStatusMsgs(): Promise<never> {
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 5000));
|
|
||||||
const jobs = await jobStore.getBy("status.type", { value: "waiting" });
|
|
||||||
for (const [index, job] of jobs.entries()) {
|
|
||||||
if (job.value.status.type !== "waiting" || !job.value.status.message) continue;
|
|
||||||
await bot.api.editMessageText(
|
|
||||||
job.value.status.message.chat.id,
|
|
||||||
job.value.status.message.message_id,
|
|
||||||
`You are ${formatOrdinal(index + 1)} in queue.`,
|
|
||||||
{ maxAttempts: 1 },
|
|
||||||
).catch(() => undefined);
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
logger().warning(`Updating job status messages failed: ${err}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue