Compare commits

...

5 Commits

Author SHA1 Message Date
pinks 4e10168786 fix: dead workers showing as alive 2023-09-23 20:51:13 +02:00
pinks 9dbeb37fd7 move files 2023-09-23 20:49:05 +02:00
pinks 27eb471f9e don't ping users in queue command 2023-09-23 01:44:44 +02:00
pinks cc854639d7 replace timeout logic with Grammy's timeout option 2023-09-23 01:42:15 +02:00
pinks 10b57926ed fix: cancel command 2023-09-23 01:40:21 +02:00
20 changed files with 37 additions and 68 deletions

View File

@ -1,4 +1,4 @@
import * as SdApi from "../common/sdApi.ts";
import * as SdApi from "../sd/sdApi.ts";
import { db } from "./db.ts";
export interface ConfigData {

View File

View File

@ -1,10 +1,10 @@
import { bot } from "../bot/mod.ts";
import { PngInfo } from "../common/parsePngInfo.ts";
import * as SdApi from "../common/sdApi.ts";
import { formatUserChat } from "../common/formatUserChat.ts";
import { getConfig, SdInstanceData } from "../db/config.ts";
import { db } from "../db/db.ts";
import { generationStore, SdGenerationInfo } from "../db/jobStore.ts";
import { PngInfo } from "../sd/parsePngInfo.ts";
import * as SdApi from "../sd/sdApi.ts";
import { formatUserChat } from "../utils/formatUserChat.ts";
import { getConfig, SdInstanceData } from "./config.ts";
import { db } from "./db.ts";
import { generationStore, SdGenerationInfo } from "./generationStore.ts";
import {
Async,
AsyncX,
@ -18,9 +18,8 @@ import {
KVMQ,
Log,
} from "../deps.ts";
import { formatOrdinal } from "../common/formatOrdinal.ts";
import { deadline } from "../common/deadline.ts";
import { SdError } from "../common/SdError.ts";
import { formatOrdinal } from "../utils/formatOrdinal.ts";
import { SdError } from "../sd/SdError.ts";
const logger = () => Log.getLogger();
@ -66,7 +65,7 @@ export async function restartGenerationWorkers() {
// check if worker is up
const activeWorkerStatus = await activeWorkerSdClient.GET("/sdapi/v1/memory", {
signal: deadline(10_000),
signal: AbortSignal.timeout(10_000),
})
.then((response) => {
if (!response.data) {
@ -233,7 +232,7 @@ async function processGenerationJob(
await Async.delay(3000);
const progressResponse = await workerSdClient.GET("/sdapi/v1/progress", {
params: {},
signal: deadline(15_000),
signal: AbortSignal.timeout(15_000),
});
if (!progressResponse.data) {
throw new SdError(

View File

@ -1,5 +1,5 @@
import { GrammyTypes, IKV } from "../deps.ts";
import { PngInfo } from "../common/parsePngInfo.ts";
import { PngInfo } from "../sd/parsePngInfo.ts";
import { db } from "./db.ts";
export interface GenerationSchema {

View File

@ -1,10 +1,10 @@
import { generationQueue } from "../tasks/generationQueue.ts";
import { generationQueue } from "../app/generationQueue.ts";
import { Context } from "./mod.ts";
export async function cancelCommand(ctx: Context) {
const jobs = await generationQueue.getAllJobs();
const userJobs = jobs
.filter((job) => job.lockUntil > new Date())
.filter((job) => job.lockUntil < new Date())
.filter((j) => j.state.from.id === ctx.from?.id);
for (const job of userJobs) await generationQueue.deleteJob(job.id);
await ctx.reply(`Cancelled ${userJobs.length} jobs`);

View File

@ -1,9 +1,9 @@
import { Collections, Grammy, GrammyStatelessQ } from "../deps.ts";
import { formatUserChat } from "../common/formatUserChat.ts";
import { parsePngInfo, PngInfo } from "../common/parsePngInfo.ts";
import { formatUserChat } from "../utils/formatUserChat.ts";
import { parsePngInfo, PngInfo } from "../sd/parsePngInfo.ts";
import { Context, logger } from "./mod.ts";
import { generationQueue } from "../tasks/generationQueue.ts";
import { getConfig } from "../db/config.ts";
import { generationQueue } from "../app/generationQueue.ts";
import { getConfig } from "../app/config.ts";
export const img2imgQuestion = new GrammyStatelessQ.StatelessQuestion<Context>(
"img2img",

View File

@ -1,11 +1,11 @@
import { Grammy, GrammyAutoQuote, GrammyFiles, GrammyParseMode, Log } from "../deps.ts";
import { formatUserChat } from "../common/formatUserChat.ts";
import { formatUserChat } from "../utils/formatUserChat.ts";
import { queueCommand } from "./queueCommand.ts";
import { txt2imgCommand, txt2imgQuestion } from "./txt2imgCommand.ts";
import { pnginfoCommand, pnginfoQuestion } from "./pnginfoCommand.ts";
import { img2imgCommand, img2imgQuestion } from "./img2imgCommand.ts";
import { cancelCommand } from "./cancelCommand.ts";
import { getConfig, setConfig } from "../db/config.ts";
import { getConfig, setConfig } from "../app/config.ts";
export const logger = () => Log.getLogger();
@ -34,7 +34,12 @@ type WithRetryApi<T extends Grammy.RawApi> = {
type Api = Grammy.Api<WithRetryApi<Grammy.RawApi>>;
export const bot = new Grammy.Bot<Context, Api>(Deno.env.get("TG_BOT_TOKEN")!);
export const bot = new Grammy.Bot<Context, Api>(
Deno.env.get("TG_BOT_TOKEN")!,
{
client: { timeoutSeconds: 30 },
},
);
bot.use(GrammyAutoQuote.autoQuote);
bot.use(GrammyParseMode.hydrateReply);
@ -54,36 +59,6 @@ bot.use(Grammy.session<
bot.api.config.use(GrammyFiles.hydrateFiles(bot.token));
// Automatically cancel requests after 30 seconds
bot.api.config.use(async (prev, method, payload, signal) => {
// don't time out getUpdates requests, they are long-polling
if (method === "getUpdates") return prev(method, payload, signal);
const controller = new AbortController();
let timedOut = false;
const timeout = setTimeout(() => {
timedOut = true;
// TODO: this sometimes throws with "can't abort a locked stream", why?
try {
controller.abort();
} catch (error) {
logger().error(`Error while cancelling on timeout: ${error}`);
}
}, 30 * 1000);
signal?.addEventListener("abort", () => {
controller.abort();
});
try {
return await prev(method, payload, controller.signal);
} finally {
clearTimeout(timeout);
if (timedOut) {
logger().warning(`${method} timed out`);
}
}
});
// Automatically retry bot requests if we get a "too many requests" or telegram internal error
bot.api.config.use(async (prev, method, payload, signal) => {
const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3;

View File

@ -1,5 +1,5 @@
import { Grammy, GrammyParseMode, GrammyStatelessQ } from "../deps.ts";
import { getPngInfo, parsePngInfo } from "../common/parsePngInfo.ts";
import { getPngInfo, parsePngInfo } from "../sd/parsePngInfo.ts";
import { Context } from "./mod.ts";
export const pnginfoQuestion = new GrammyStatelessQ.StatelessQuestion<Context>(

View File

@ -1,12 +1,12 @@
import { Grammy, GrammyParseMode } from "../deps.ts";
import { Context, logger } from "./mod.ts";
import { getFlagEmoji } from "../common/getFlagEmoji.ts";
import { activeGenerationWorkers, generationQueue } from "../tasks/generationQueue.ts";
import { getConfig } from "../db/config.ts";
import { getFlagEmoji } from "../utils/getFlagEmoji.ts";
import { activeGenerationWorkers, generationQueue } from "../app/generationQueue.ts";
import { getConfig } from "../app/config.ts";
export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
let formattedMessage = await getMessageText();
const queueMessage = await ctx.replyFmt(formattedMessage);
const queueMessage = await ctx.replyFmt(formattedMessage, { disable_notification: true });
handleFutureUpdates().catch((err) => logger().warning(`Updating queue message failed: ${err}`));
async function getMessageText() {
@ -42,7 +42,7 @@ export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
: ["Queue is empty.\n"],
"\nActive workers:\n",
...config.sdInstances.flatMap((sdInstance) => [
activeGenerationWorkers.has(sdInstance.id) ? "✅ " : "☠️ ",
activeGenerationWorkers.get(sdInstance.id)?.isProcessing ? "✅ " : "☠️ ",
fmt`${bold(sdInstance.name || sdInstance.id)} `,
`(max ${(sdInstance.maxResolution / 1000000).toFixed(1)} Mpx) `,
"\n",

View File

@ -1,9 +1,9 @@
import { Grammy, GrammyStatelessQ } from "../deps.ts";
import { formatUserChat } from "../common/formatUserChat.ts";
import { getPngInfo, parsePngInfo, PngInfo } from "../common/parsePngInfo.ts";
import { formatUserChat } from "../utils/formatUserChat.ts";
import { getPngInfo, parsePngInfo, PngInfo } from "../sd/parsePngInfo.ts";
import { Context, logger } from "./mod.ts";
import { generationQueue } from "../tasks/generationQueue.ts";
import { getConfig } from "../db/config.ts";
import { generationQueue } from "../app/generationQueue.ts";
import { getConfig } from "../app/config.ts";
export const txt2imgQuestion = new GrammyStatelessQ.StatelessQuestion<Context>(
"txt2img",

View File

@ -1,5 +0,0 @@
export function deadline(timeout: number): AbortSignal {
const controller = new AbortController();
setTimeout(() => controller.abort(), timeout);
return controller.signal;
}

View File

@ -14,7 +14,7 @@ Log.setup({
// Main program logic
import { bot } from "./bot/mod.ts";
import { runAllTasks } from "./tasks/mod.ts";
import { runAllTasks } from "./app/mod.ts";
await Promise.all([
bot.start(),
runAllTasks(),