Compare commits
2 Commits
7541deb178
...
0ab1f89579
Author | SHA1 | Date |
---|---|---|
pinks | 0ab1f89579 | |
pinks | b818fc9207 |
21
bot/mod.ts
21
bot/mod.ts
|
@ -26,6 +26,27 @@ bot.use(session);
|
||||||
|
|
||||||
bot.api.config.use(GrammyFiles.hydrateFiles(bot.token));
|
bot.api.config.use(GrammyFiles.hydrateFiles(bot.token));
|
||||||
|
|
||||||
|
// Automatically cancel requests after 30 seconds
|
||||||
|
bot.api.config.use(async (prev, method, payload, signal) => {
|
||||||
|
const controller = new AbortController();
|
||||||
|
let timedOut = false;
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
timedOut = true;
|
||||||
|
controller.abort();
|
||||||
|
}, 30 * 1000);
|
||||||
|
signal?.addEventListener("abort", () => {
|
||||||
|
controller.abort();
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
return await prev(method, payload, controller.signal);
|
||||||
|
} finally {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
if (timedOut) {
|
||||||
|
logger().warning(`${method} timed out`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Automatically retry bot requests if we get a "too many requests" or telegram internal error
|
// Automatically retry bot requests if we get a "too many requests" or telegram internal error
|
||||||
bot.api.config.use(async (prev, method, payload, signal) => {
|
bot.api.config.use(async (prev, method, payload, signal) => {
|
||||||
const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3;
|
const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3;
|
||||||
|
|
|
@ -39,8 +39,8 @@ export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
|
||||||
: ["Queue is empty.\n"],
|
: ["Queue is empty.\n"],
|
||||||
"\nActive workers:\n",
|
"\nActive workers:\n",
|
||||||
...ctx.session.global.workers.flatMap((worker) => [
|
...ctx.session.global.workers.flatMap((worker) => [
|
||||||
runningWorkers.has(worker.name) ? "✅ " : "☠️ ",
|
runningWorkers.has(worker.id) ? "✅ " : "☠️ ",
|
||||||
fmt`${bold(worker.name)} `,
|
fmt`${bold(worker.name || worker.id)} `,
|
||||||
`(max ${(worker.maxResolution / 1000000).toFixed(1)} Mpx) `,
|
`(max ${(worker.maxResolution / 1000000).toFixed(1)} Mpx) `,
|
||||||
"\n",
|
"\n",
|
||||||
]),
|
]),
|
||||||
|
@ -49,7 +49,7 @@ export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
|
||||||
|
|
||||||
async function handleFutureUpdates() {
|
async function handleFutureUpdates() {
|
||||||
for (let idx = 0; idx < 30; idx++) {
|
for (let idx = 0; idx < 30; idx++) {
|
||||||
await ctx.api.sendChatAction(ctx.chat.id, "typing");
|
await ctx.api.sendChatAction(ctx.chat.id, "typing", { maxAttempts: 1 } as never);
|
||||||
await new Promise((resolve) => setTimeout(resolve, 4000));
|
await new Promise((resolve) => setTimeout(resolve, 4000));
|
||||||
const nextFormattedMessage = await getMessageText();
|
const nextFormattedMessage = await getMessageText();
|
||||||
if (nextFormattedMessage.text !== formattedMessage.text) {
|
if (nextFormattedMessage.text !== formattedMessage.text) {
|
||||||
|
|
|
@ -20,9 +20,9 @@ export interface GlobalData {
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface WorkerData {
|
export interface WorkerData {
|
||||||
name: string;
|
id: string;
|
||||||
|
name?: string;
|
||||||
api: SdApi;
|
api: SdApi;
|
||||||
auth?: string;
|
|
||||||
maxResolution: number;
|
maxResolution: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ const getDefaultGlobalData = (): GlobalData => ({
|
||||||
},
|
},
|
||||||
workers: [
|
workers: [
|
||||||
{
|
{
|
||||||
name: "local",
|
id: "local",
|
||||||
api: { url: Deno.env.get("SD_API_URL") ?? "http://127.0.0.1:7860/" },
|
api: { url: Deno.env.get("SD_API_URL") ?? "http://127.0.0.1:7860/" },
|
||||||
maxResolution: 1024 * 1024,
|
maxResolution: 1024 * 1024,
|
||||||
},
|
},
|
||||||
|
|
48
sd.ts
48
sd.ts
|
@ -1,13 +1,17 @@
|
||||||
import { Async, AsyncX, PngChunksExtract, PngChunkText } from "./deps.ts";
|
import { Async, AsyncX, PngChunksExtract, PngChunkText } from "./deps.ts";
|
||||||
|
|
||||||
const neverSignal = new AbortController().signal;
|
|
||||||
|
|
||||||
export interface SdApi {
|
export interface SdApi {
|
||||||
url: string;
|
url: string;
|
||||||
auth?: string;
|
auth?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function fetchSdApi<T>(api: SdApi, endpoint: string, body?: unknown): Promise<T> {
|
async function fetchSdApi<T>(
|
||||||
|
api: SdApi,
|
||||||
|
endpoint: string,
|
||||||
|
{ body, timeoutMs }: { body?: unknown; timeoutMs?: number } = {},
|
||||||
|
): Promise<T> {
|
||||||
|
const controller = new AbortController();
|
||||||
|
const timeoutId = timeoutMs ? setTimeout(() => controller.abort(), timeoutMs) : undefined;
|
||||||
let options: RequestInit | undefined;
|
let options: RequestInit | undefined;
|
||||||
if (body != null) {
|
if (body != null) {
|
||||||
options = {
|
options = {
|
||||||
|
@ -17,13 +21,18 @@ async function fetchSdApi<T>(api: SdApi, endpoint: string, body?: unknown): Prom
|
||||||
...api.auth ? { Authorization: api.auth } : {},
|
...api.auth ? { Authorization: api.auth } : {},
|
||||||
},
|
},
|
||||||
body: JSON.stringify(body),
|
body: JSON.stringify(body),
|
||||||
|
signal: controller.signal,
|
||||||
};
|
};
|
||||||
} else if (api.auth) {
|
} else if (api.auth) {
|
||||||
options = {
|
options = {
|
||||||
headers: { Authorization: api.auth },
|
headers: { Authorization: api.auth },
|
||||||
|
signal: controller.signal,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
const response = await fetch(new URL(endpoint, api.url), options).catch(() => {
|
const response = await fetch(new URL(endpoint, api.url), options).catch(() => {
|
||||||
|
if (controller.signal.aborted) {
|
||||||
|
throw new SdApiError(endpoint, options, -1, "Timed out");
|
||||||
|
}
|
||||||
throw new SdApiError(endpoint, options, 0, "Network error");
|
throw new SdApiError(endpoint, options, 0, "Network error");
|
||||||
});
|
});
|
||||||
const result = await response.json().catch(() => {
|
const result = await response.json().catch(() => {
|
||||||
|
@ -31,6 +40,7 @@ async function fetchSdApi<T>(api: SdApi, endpoint: string, body?: unknown): Prom
|
||||||
detail: "Invalid JSON",
|
detail: "Invalid JSON",
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
clearTimeout(timeoutId);
|
||||||
if (!response.ok) {
|
if (!response.ok) {
|
||||||
throw new SdApiError(endpoint, options, response.status, response.statusText, result);
|
throw new SdApiError(endpoint, options, response.status, response.statusText, result);
|
||||||
}
|
}
|
||||||
|
@ -78,9 +88,12 @@ export async function sdTxt2Img(
|
||||||
api: SdApi,
|
api: SdApi,
|
||||||
params: Partial<SdTxt2ImgRequest>,
|
params: Partial<SdTxt2ImgRequest>,
|
||||||
onProgress?: (progress: SdProgressResponse) => void,
|
onProgress?: (progress: SdProgressResponse) => void,
|
||||||
signal: AbortSignal = neverSignal,
|
|
||||||
): Promise<SdResponse<SdTxt2ImgRequest>> {
|
): Promise<SdResponse<SdTxt2ImgRequest>> {
|
||||||
const request = fetchSdApi<SdResponse<SdTxt2ImgRequest>>(api, "sdapi/v1/txt2img", params)
|
const request = fetchSdApi<SdResponse<SdTxt2ImgRequest>>(
|
||||||
|
api,
|
||||||
|
"sdapi/v1/txt2img",
|
||||||
|
{ body: params },
|
||||||
|
)
|
||||||
// JSON field "info" is a JSON-serialized string so we need to parse this part second time
|
// JSON field "info" is a JSON-serialized string so we need to parse this part second time
|
||||||
.then((data) => ({
|
.then((data) => ({
|
||||||
...data,
|
...data,
|
||||||
|
@ -89,13 +102,15 @@ export async function sdTxt2Img(
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
await Async.abortable(Promise.race([request, Async.delay(4000)]), signal);
|
await Promise.race([request, Async.delay(3000)]);
|
||||||
if (await AsyncX.promiseState(request) !== "pending") return await request;
|
if (await AsyncX.promiseState(request) !== "pending") return await request;
|
||||||
onProgress?.(await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress"));
|
onProgress?.(
|
||||||
|
await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress", { timeoutMs: 10_000 }),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (await AsyncX.promiseState(request) === "pending") {
|
if (await AsyncX.promiseState(request) === "pending") {
|
||||||
await fetchSdApi(api, "sdapi/v1/interrupt", {});
|
await fetchSdApi(api, "sdapi/v1/interrupt", { timeoutMs: 10_000 });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -118,9 +133,12 @@ export async function sdImg2Img(
|
||||||
api: SdApi,
|
api: SdApi,
|
||||||
params: Partial<SdImg2ImgRequest>,
|
params: Partial<SdImg2ImgRequest>,
|
||||||
onProgress?: (progress: SdProgressResponse) => void,
|
onProgress?: (progress: SdProgressResponse) => void,
|
||||||
signal: AbortSignal = neverSignal,
|
|
||||||
): Promise<SdResponse<SdImg2ImgRequest>> {
|
): Promise<SdResponse<SdImg2ImgRequest>> {
|
||||||
const request = fetchSdApi<SdResponse<SdImg2ImgRequest>>(api, "sdapi/v1/img2img", params)
|
const request = fetchSdApi<SdResponse<SdImg2ImgRequest>>(
|
||||||
|
api,
|
||||||
|
"sdapi/v1/img2img",
|
||||||
|
{ body: params },
|
||||||
|
)
|
||||||
// JSON field "info" is a JSON-serialized string so we need to parse this part second time
|
// JSON field "info" is a JSON-serialized string so we need to parse this part second time
|
||||||
.then((data) => ({
|
.then((data) => ({
|
||||||
...data,
|
...data,
|
||||||
|
@ -129,13 +147,15 @@ export async function sdImg2Img(
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
await Async.abortable(Promise.race([request, Async.delay(4000)]), signal);
|
await Promise.race([request, Async.delay(3000)]);
|
||||||
if (await AsyncX.promiseState(request) !== "pending") return await request;
|
if (await AsyncX.promiseState(request) !== "pending") return await request;
|
||||||
onProgress?.(await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress"));
|
onProgress?.(
|
||||||
|
await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress", { timeoutMs: 10_000 }),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (await AsyncX.promiseState(request) === "pending") {
|
if (await AsyncX.promiseState(request) === "pending") {
|
||||||
await fetchSdApi(api, "sdapi/v1/interrupt", {});
|
await fetchSdApi(api, "sdapi/v1/interrupt", { timeoutMs: 10_000 });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -220,7 +240,7 @@ export interface SdProgressState {
|
||||||
}
|
}
|
||||||
|
|
||||||
export function sdGetConfig(api: SdApi): Promise<SdConfigResponse> {
|
export function sdGetConfig(api: SdApi): Promise<SdConfigResponse> {
|
||||||
return fetchSdApi(api, "config");
|
return fetchSdApi(api, "config", { timeoutMs: 10_000 });
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface SdConfigResponse {
|
export interface SdConfigResponse {
|
||||||
|
|
|
@ -15,13 +15,13 @@ export async function pingWorkers(): Promise<never> {
|
||||||
const config = await getGlobalSession();
|
const config = await getGlobalSession();
|
||||||
for (const worker of config.workers) {
|
for (const worker of config.workers) {
|
||||||
const status = await sdGetConfig(worker.api).catch(() => null);
|
const status = await sdGetConfig(worker.api).catch(() => null);
|
||||||
const wasRunning = runningWorkers.has(worker.name);
|
const wasRunning = runningWorkers.has(worker.id);
|
||||||
if (status) {
|
if (status) {
|
||||||
runningWorkers.add(worker.name);
|
runningWorkers.add(worker.id);
|
||||||
if (!wasRunning) logger().info(`Worker ${worker.name} is online`);
|
if (!wasRunning) logger().info(`Worker ${worker.id} is online`);
|
||||||
} else {
|
} else {
|
||||||
runningWorkers.delete(worker.name);
|
runningWorkers.delete(worker.id);
|
||||||
if (wasRunning) logger().warning(`Worker ${worker.name} went offline`);
|
if (wasRunning) logger().warning(`Worker ${worker.id} went offline`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
await Async.delay(60 * 1000);
|
await Async.delay(60 * 1000);
|
||||||
|
|
|
@ -34,8 +34,8 @@ export async function processJobs(): Promise<never> {
|
||||||
// find a worker to handle the job
|
// find a worker to handle the job
|
||||||
const config = await getGlobalSession();
|
const config = await getGlobalSession();
|
||||||
const worker = config.workers?.find((worker) =>
|
const worker = config.workers?.find((worker) =>
|
||||||
runningWorkers.has(worker.name) &&
|
runningWorkers.has(worker.id) &&
|
||||||
!busyWorkers.has(worker.name)
|
!busyWorkers.has(worker.id)
|
||||||
);
|
);
|
||||||
if (!worker) continue;
|
if (!worker) continue;
|
||||||
|
|
||||||
|
@ -45,16 +45,16 @@ export async function processJobs(): Promise<never> {
|
||||||
status: {
|
status: {
|
||||||
type: "processing",
|
type: "processing",
|
||||||
progress: 0,
|
progress: 0,
|
||||||
worker: worker.name,
|
worker: worker.id,
|
||||||
updatedDate: new Date(),
|
updatedDate: new Date(),
|
||||||
message: job.value.status.type !== "done" ? job.value.status.message : undefined,
|
message: job.value.status.type !== "done" ? job.value.status.message : undefined,
|
||||||
},
|
},
|
||||||
}));
|
}));
|
||||||
busyWorkers.add(worker.name);
|
busyWorkers.add(worker.id);
|
||||||
processJob(job, worker, config)
|
processJob(job, worker, config)
|
||||||
.catch(async (err) => {
|
.catch(async (err) => {
|
||||||
logger().error(
|
logger().error(
|
||||||
`Job failed for ${formatUserChat(job.value)} via ${worker.name}: ${err}`,
|
`Job failed for ${formatUserChat(job.value)} via ${worker.id}: ${err}`,
|
||||||
);
|
);
|
||||||
if (err instanceof Grammy.GrammyError || err instanceof SdApiError) {
|
if (err instanceof Grammy.GrammyError || err instanceof SdApiError) {
|
||||||
await bot.api.sendMessage(
|
await bot.api.sendMessage(
|
||||||
|
@ -72,9 +72,9 @@ export async function processJobs(): Promise<never> {
|
||||||
err.statusCode === 401
|
err.statusCode === 401
|
||||||
)
|
)
|
||||||
) {
|
) {
|
||||||
runningWorkers.delete(worker.name);
|
runningWorkers.delete(worker.id);
|
||||||
logger().warning(
|
logger().warning(
|
||||||
`Worker ${worker.name} was marked as offline because of network error`,
|
`Worker ${worker.id} was marked as offline because of network error`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
await job.delete().catch(() => undefined);
|
await job.delete().catch(() => undefined);
|
||||||
|
@ -82,7 +82,7 @@ export async function processJobs(): Promise<never> {
|
||||||
await jobStore.create(job.value);
|
await jobStore.create(job.value);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.finally(() => busyWorkers.delete(worker.name));
|
.finally(() => busyWorkers.delete(worker.id));
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger().warning(`Processing jobs failed: ${err}`);
|
logger().warning(`Processing jobs failed: ${err}`);
|
||||||
}
|
}
|
||||||
|
@ -91,7 +91,7 @@ export async function processJobs(): Promise<never> {
|
||||||
|
|
||||||
async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config: GlobalData) {
|
async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config: GlobalData) {
|
||||||
logger().debug(
|
logger().debug(
|
||||||
`Job started for ${formatUserChat(job.value)} using ${worker.name}`,
|
`Job started for ${formatUserChat(job.value)} using ${worker.id}`,
|
||||||
);
|
);
|
||||||
const startDate = new Date();
|
const startDate = new Date();
|
||||||
|
|
||||||
|
@ -161,7 +161,9 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
||||||
|
|
||||||
// process the job
|
// process the job
|
||||||
const handleProgress = async (progress: SdProgressResponse) => {
|
const handleProgress = async (progress: SdProgressResponse) => {
|
||||||
|
// Important: don't let any errors escape this function
|
||||||
if (job.value.status.type === "processing" && job.value.status.message) {
|
if (job.value.status.type === "processing" && job.value.status.message) {
|
||||||
|
if (job.value.status.progress === progress.progress) return;
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 }),
|
bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 }),
|
||||||
bot.api.editMessageText(
|
bot.api.editMessageText(
|
||||||
|
@ -177,12 +179,16 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
||||||
status: {
|
status: {
|
||||||
type: "processing",
|
type: "processing",
|
||||||
progress: progress.progress,
|
progress: progress.progress,
|
||||||
worker: worker.name,
|
worker: worker.id,
|
||||||
updatedDate: new Date(),
|
updatedDate: new Date(),
|
||||||
message: value.status.type !== "done" ? value.status.message : undefined,
|
message: value.status.type !== "done" ? value.status.message : undefined,
|
||||||
},
|
},
|
||||||
}), { maxAttempts: 1 }),
|
}), { maxAttempts: 1 }),
|
||||||
]).catch(() => undefined);
|
]).catch((err) =>
|
||||||
|
logger().warning(
|
||||||
|
`Updating job status for ${formatUserChat(job.value)} using ${worker.id} failed: ${err}`,
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let response: SdResponse<unknown>;
|
let response: SdResponse<unknown>;
|
||||||
|
@ -217,6 +223,7 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
||||||
job.value.status.message.chat.id,
|
job.value.status.message.chat.id,
|
||||||
job.value.status.message.message_id,
|
job.value.status.message.message_id,
|
||||||
`Uploading your images...`,
|
`Uploading your images...`,
|
||||||
|
{ maxAttempts: 1 },
|
||||||
).catch(() => undefined);
|
).catch(() => undefined);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +244,7 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
||||||
fmt`${bold("CFG scale:")} ${response.info.cfg_scale}, `,
|
fmt`${bold("CFG scale:")} ${response.info.cfg_scale}, `,
|
||||||
fmt`${bold("Seed:")} ${response.info.seed}, `,
|
fmt`${bold("Seed:")} ${response.info.seed}, `,
|
||||||
fmt`${bold("Size")}: ${response.info.width}x${response.info.height}, `,
|
fmt`${bold("Size")}: ${response.info.width}x${response.info.height}, `,
|
||||||
fmt`${bold("Worker")}: ${worker.name}, `,
|
fmt`${bold("Worker")}: ${worker.id}, `,
|
||||||
fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`,
|
fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`,
|
||||||
]
|
]
|
||||||
: [],
|
: [],
|
||||||
|
@ -275,7 +282,11 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
||||||
});
|
});
|
||||||
break;
|
break;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger().warning(`Sending images (attempt ${sendMediaAttempt}) failed: ${err}`);
|
logger().warning(
|
||||||
|
`Sending images (attempt ${sendMediaAttempt}) for ${
|
||||||
|
formatUserChat(job.value)
|
||||||
|
} using ${worker.id} failed: ${err}`,
|
||||||
|
);
|
||||||
if (sendMediaAttempt >= 6) throw err;
|
if (sendMediaAttempt >= 6) throw err;
|
||||||
// wait 2 * 5 seconds before retrying
|
// wait 2 * 5 seconds before retrying
|
||||||
for (let i = 0; i < 2; i++) {
|
for (let i = 0; i < 2; i++) {
|
||||||
|
@ -313,7 +324,7 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
|
||||||
}));
|
}));
|
||||||
|
|
||||||
logger().debug(
|
logger().debug(
|
||||||
`Job finished for ${formatUserChat(job.value)} using ${worker.name}${
|
`Job finished for ${formatUserChat(job.value)} using ${worker.id}${
|
||||||
sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : ""
|
sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : ""
|
||||||
}`,
|
}`,
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue