Compare commits

...

2 Commits

Author SHA1 Message Date
pinks 0ab1f89579 fix: better handling of timeouts 2023-09-13 02:38:09 +02:00
pinks b818fc9207 show worker long names 2023-09-13 00:53:50 +02:00
6 changed files with 91 additions and 39 deletions

View File

@ -26,6 +26,27 @@ bot.use(session);
bot.api.config.use(GrammyFiles.hydrateFiles(bot.token)); bot.api.config.use(GrammyFiles.hydrateFiles(bot.token));
// Automatically cancel requests after 30 seconds
bot.api.config.use(async (prev, method, payload, signal) => {
const controller = new AbortController();
let timedOut = false;
const timeout = setTimeout(() => {
timedOut = true;
controller.abort();
}, 30 * 1000);
signal?.addEventListener("abort", () => {
controller.abort();
});
try {
return await prev(method, payload, controller.signal);
} finally {
clearTimeout(timeout);
if (timedOut) {
logger().warning(`${method} timed out`);
}
}
});
// Automatically retry bot requests if we get a "too many requests" or telegram internal error // Automatically retry bot requests if we get a "too many requests" or telegram internal error
bot.api.config.use(async (prev, method, payload, signal) => { bot.api.config.use(async (prev, method, payload, signal) => {
const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3; const maxAttempts = payload && ("maxAttempts" in payload) ? payload.maxAttempts ?? 3 : 3;

View File

@ -39,8 +39,8 @@ export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
: ["Queue is empty.\n"], : ["Queue is empty.\n"],
"\nActive workers:\n", "\nActive workers:\n",
...ctx.session.global.workers.flatMap((worker) => [ ...ctx.session.global.workers.flatMap((worker) => [
runningWorkers.has(worker.name) ? "✅ " : "☠️ ", runningWorkers.has(worker.id) ? "✅ " : "☠️ ",
fmt`${bold(worker.name)} `, fmt`${bold(worker.name || worker.id)} `,
`(max ${(worker.maxResolution / 1000000).toFixed(1)} Mpx) `, `(max ${(worker.maxResolution / 1000000).toFixed(1)} Mpx) `,
"\n", "\n",
]), ]),
@ -49,7 +49,7 @@ export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
async function handleFutureUpdates() { async function handleFutureUpdates() {
for (let idx = 0; idx < 30; idx++) { for (let idx = 0; idx < 30; idx++) {
await ctx.api.sendChatAction(ctx.chat.id, "typing"); await ctx.api.sendChatAction(ctx.chat.id, "typing", { maxAttempts: 1 } as never);
await new Promise((resolve) => setTimeout(resolve, 4000)); await new Promise((resolve) => setTimeout(resolve, 4000));
const nextFormattedMessage = await getMessageText(); const nextFormattedMessage = await getMessageText();
if (nextFormattedMessage.text !== formattedMessage.text) { if (nextFormattedMessage.text !== formattedMessage.text) {

View File

@ -20,9 +20,9 @@ export interface GlobalData {
} }
export interface WorkerData { export interface WorkerData {
name: string; id: string;
name?: string;
api: SdApi; api: SdApi;
auth?: string;
maxResolution: number; maxResolution: number;
} }
@ -52,7 +52,7 @@ const getDefaultGlobalData = (): GlobalData => ({
}, },
workers: [ workers: [
{ {
name: "local", id: "local",
api: { url: Deno.env.get("SD_API_URL") ?? "http://127.0.0.1:7860/" }, api: { url: Deno.env.get("SD_API_URL") ?? "http://127.0.0.1:7860/" },
maxResolution: 1024 * 1024, maxResolution: 1024 * 1024,
}, },

48
sd.ts
View File

@ -1,13 +1,17 @@
import { Async, AsyncX, PngChunksExtract, PngChunkText } from "./deps.ts"; import { Async, AsyncX, PngChunksExtract, PngChunkText } from "./deps.ts";
const neverSignal = new AbortController().signal;
export interface SdApi { export interface SdApi {
url: string; url: string;
auth?: string; auth?: string;
} }
async function fetchSdApi<T>(api: SdApi, endpoint: string, body?: unknown): Promise<T> { async function fetchSdApi<T>(
api: SdApi,
endpoint: string,
{ body, timeoutMs }: { body?: unknown; timeoutMs?: number } = {},
): Promise<T> {
const controller = new AbortController();
const timeoutId = timeoutMs ? setTimeout(() => controller.abort(), timeoutMs) : undefined;
let options: RequestInit | undefined; let options: RequestInit | undefined;
if (body != null) { if (body != null) {
options = { options = {
@ -17,13 +21,18 @@ async function fetchSdApi<T>(api: SdApi, endpoint: string, body?: unknown): Prom
...api.auth ? { Authorization: api.auth } : {}, ...api.auth ? { Authorization: api.auth } : {},
}, },
body: JSON.stringify(body), body: JSON.stringify(body),
signal: controller.signal,
}; };
} else if (api.auth) { } else if (api.auth) {
options = { options = {
headers: { Authorization: api.auth }, headers: { Authorization: api.auth },
signal: controller.signal,
}; };
} }
const response = await fetch(new URL(endpoint, api.url), options).catch(() => { const response = await fetch(new URL(endpoint, api.url), options).catch(() => {
if (controller.signal.aborted) {
throw new SdApiError(endpoint, options, -1, "Timed out");
}
throw new SdApiError(endpoint, options, 0, "Network error"); throw new SdApiError(endpoint, options, 0, "Network error");
}); });
const result = await response.json().catch(() => { const result = await response.json().catch(() => {
@ -31,6 +40,7 @@ async function fetchSdApi<T>(api: SdApi, endpoint: string, body?: unknown): Prom
detail: "Invalid JSON", detail: "Invalid JSON",
}); });
}); });
clearTimeout(timeoutId);
if (!response.ok) { if (!response.ok) {
throw new SdApiError(endpoint, options, response.status, response.statusText, result); throw new SdApiError(endpoint, options, response.status, response.statusText, result);
} }
@ -78,9 +88,12 @@ export async function sdTxt2Img(
api: SdApi, api: SdApi,
params: Partial<SdTxt2ImgRequest>, params: Partial<SdTxt2ImgRequest>,
onProgress?: (progress: SdProgressResponse) => void, onProgress?: (progress: SdProgressResponse) => void,
signal: AbortSignal = neverSignal,
): Promise<SdResponse<SdTxt2ImgRequest>> { ): Promise<SdResponse<SdTxt2ImgRequest>> {
const request = fetchSdApi<SdResponse<SdTxt2ImgRequest>>(api, "sdapi/v1/txt2img", params) const request = fetchSdApi<SdResponse<SdTxt2ImgRequest>>(
api,
"sdapi/v1/txt2img",
{ body: params },
)
// JSON field "info" is a JSON-serialized string so we need to parse this part second time // JSON field "info" is a JSON-serialized string so we need to parse this part second time
.then((data) => ({ .then((data) => ({
...data, ...data,
@ -89,13 +102,15 @@ export async function sdTxt2Img(
try { try {
while (true) { while (true) {
await Async.abortable(Promise.race([request, Async.delay(4000)]), signal); await Promise.race([request, Async.delay(3000)]);
if (await AsyncX.promiseState(request) !== "pending") return await request; if (await AsyncX.promiseState(request) !== "pending") return await request;
onProgress?.(await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress")); onProgress?.(
await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress", { timeoutMs: 10_000 }),
);
} }
} finally { } finally {
if (await AsyncX.promiseState(request) === "pending") { if (await AsyncX.promiseState(request) === "pending") {
await fetchSdApi(api, "sdapi/v1/interrupt", {}); await fetchSdApi(api, "sdapi/v1/interrupt", { timeoutMs: 10_000 });
} }
} }
} }
@ -118,9 +133,12 @@ export async function sdImg2Img(
api: SdApi, api: SdApi,
params: Partial<SdImg2ImgRequest>, params: Partial<SdImg2ImgRequest>,
onProgress?: (progress: SdProgressResponse) => void, onProgress?: (progress: SdProgressResponse) => void,
signal: AbortSignal = neverSignal,
): Promise<SdResponse<SdImg2ImgRequest>> { ): Promise<SdResponse<SdImg2ImgRequest>> {
const request = fetchSdApi<SdResponse<SdImg2ImgRequest>>(api, "sdapi/v1/img2img", params) const request = fetchSdApi<SdResponse<SdImg2ImgRequest>>(
api,
"sdapi/v1/img2img",
{ body: params },
)
// JSON field "info" is a JSON-serialized string so we need to parse this part second time // JSON field "info" is a JSON-serialized string so we need to parse this part second time
.then((data) => ({ .then((data) => ({
...data, ...data,
@ -129,13 +147,15 @@ export async function sdImg2Img(
try { try {
while (true) { while (true) {
await Async.abortable(Promise.race([request, Async.delay(4000)]), signal); await Promise.race([request, Async.delay(3000)]);
if (await AsyncX.promiseState(request) !== "pending") return await request; if (await AsyncX.promiseState(request) !== "pending") return await request;
onProgress?.(await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress")); onProgress?.(
await fetchSdApi<SdProgressResponse>(api, "sdapi/v1/progress", { timeoutMs: 10_000 }),
);
} }
} finally { } finally {
if (await AsyncX.promiseState(request) === "pending") { if (await AsyncX.promiseState(request) === "pending") {
await fetchSdApi(api, "sdapi/v1/interrupt", {}); await fetchSdApi(api, "sdapi/v1/interrupt", { timeoutMs: 10_000 });
} }
} }
} }
@ -220,7 +240,7 @@ export interface SdProgressState {
} }
export function sdGetConfig(api: SdApi): Promise<SdConfigResponse> { export function sdGetConfig(api: SdApi): Promise<SdConfigResponse> {
return fetchSdApi(api, "config"); return fetchSdApi(api, "config", { timeoutMs: 10_000 });
} }
export interface SdConfigResponse { export interface SdConfigResponse {

View File

@ -15,13 +15,13 @@ export async function pingWorkers(): Promise<never> {
const config = await getGlobalSession(); const config = await getGlobalSession();
for (const worker of config.workers) { for (const worker of config.workers) {
const status = await sdGetConfig(worker.api).catch(() => null); const status = await sdGetConfig(worker.api).catch(() => null);
const wasRunning = runningWorkers.has(worker.name); const wasRunning = runningWorkers.has(worker.id);
if (status) { if (status) {
runningWorkers.add(worker.name); runningWorkers.add(worker.id);
if (!wasRunning) logger().info(`Worker ${worker.name} is online`); if (!wasRunning) logger().info(`Worker ${worker.id} is online`);
} else { } else {
runningWorkers.delete(worker.name); runningWorkers.delete(worker.id);
if (wasRunning) logger().warning(`Worker ${worker.name} went offline`); if (wasRunning) logger().warning(`Worker ${worker.id} went offline`);
} }
} }
await Async.delay(60 * 1000); await Async.delay(60 * 1000);

View File

@ -34,8 +34,8 @@ export async function processJobs(): Promise<never> {
// find a worker to handle the job // find a worker to handle the job
const config = await getGlobalSession(); const config = await getGlobalSession();
const worker = config.workers?.find((worker) => const worker = config.workers?.find((worker) =>
runningWorkers.has(worker.name) && runningWorkers.has(worker.id) &&
!busyWorkers.has(worker.name) !busyWorkers.has(worker.id)
); );
if (!worker) continue; if (!worker) continue;
@ -45,16 +45,16 @@ export async function processJobs(): Promise<never> {
status: { status: {
type: "processing", type: "processing",
progress: 0, progress: 0,
worker: worker.name, worker: worker.id,
updatedDate: new Date(), updatedDate: new Date(),
message: job.value.status.type !== "done" ? job.value.status.message : undefined, message: job.value.status.type !== "done" ? job.value.status.message : undefined,
}, },
})); }));
busyWorkers.add(worker.name); busyWorkers.add(worker.id);
processJob(job, worker, config) processJob(job, worker, config)
.catch(async (err) => { .catch(async (err) => {
logger().error( logger().error(
`Job failed for ${formatUserChat(job.value)} via ${worker.name}: ${err}`, `Job failed for ${formatUserChat(job.value)} via ${worker.id}: ${err}`,
); );
if (err instanceof Grammy.GrammyError || err instanceof SdApiError) { if (err instanceof Grammy.GrammyError || err instanceof SdApiError) {
await bot.api.sendMessage( await bot.api.sendMessage(
@ -72,9 +72,9 @@ export async function processJobs(): Promise<never> {
err.statusCode === 401 err.statusCode === 401
) )
) { ) {
runningWorkers.delete(worker.name); runningWorkers.delete(worker.id);
logger().warning( logger().warning(
`Worker ${worker.name} was marked as offline because of network error`, `Worker ${worker.id} was marked as offline because of network error`,
); );
} }
await job.delete().catch(() => undefined); await job.delete().catch(() => undefined);
@ -82,7 +82,7 @@ export async function processJobs(): Promise<never> {
await jobStore.create(job.value); await jobStore.create(job.value);
} }
}) })
.finally(() => busyWorkers.delete(worker.name)); .finally(() => busyWorkers.delete(worker.id));
} catch (err) { } catch (err) {
logger().warning(`Processing jobs failed: ${err}`); logger().warning(`Processing jobs failed: ${err}`);
} }
@ -91,7 +91,7 @@ export async function processJobs(): Promise<never> {
async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config: GlobalData) { async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config: GlobalData) {
logger().debug( logger().debug(
`Job started for ${formatUserChat(job.value)} using ${worker.name}`, `Job started for ${formatUserChat(job.value)} using ${worker.id}`,
); );
const startDate = new Date(); const startDate = new Date();
@ -161,7 +161,9 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
// process the job // process the job
const handleProgress = async (progress: SdProgressResponse) => { const handleProgress = async (progress: SdProgressResponse) => {
// Important: don't let any errors escape this function
if (job.value.status.type === "processing" && job.value.status.message) { if (job.value.status.type === "processing" && job.value.status.message) {
if (job.value.status.progress === progress.progress) return;
await Promise.all([ await Promise.all([
bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 }), bot.api.sendChatAction(job.value.chat.id, "upload_photo", { maxAttempts: 1 }),
bot.api.editMessageText( bot.api.editMessageText(
@ -177,12 +179,16 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
status: { status: {
type: "processing", type: "processing",
progress: progress.progress, progress: progress.progress,
worker: worker.name, worker: worker.id,
updatedDate: new Date(), updatedDate: new Date(),
message: value.status.type !== "done" ? value.status.message : undefined, message: value.status.type !== "done" ? value.status.message : undefined,
}, },
}), { maxAttempts: 1 }), }), { maxAttempts: 1 }),
]).catch(() => undefined); ]).catch((err) =>
logger().warning(
`Updating job status for ${formatUserChat(job.value)} using ${worker.id} failed: ${err}`,
)
);
} }
}; };
let response: SdResponse<unknown>; let response: SdResponse<unknown>;
@ -217,6 +223,7 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
job.value.status.message.chat.id, job.value.status.message.chat.id,
job.value.status.message.message_id, job.value.status.message.message_id,
`Uploading your images...`, `Uploading your images...`,
{ maxAttempts: 1 },
).catch(() => undefined); ).catch(() => undefined);
} }
@ -237,7 +244,7 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
fmt`${bold("CFG scale:")} ${response.info.cfg_scale}, `, fmt`${bold("CFG scale:")} ${response.info.cfg_scale}, `,
fmt`${bold("Seed:")} ${response.info.seed}, `, fmt`${bold("Seed:")} ${response.info.seed}, `,
fmt`${bold("Size")}: ${response.info.width}x${response.info.height}, `, fmt`${bold("Size")}: ${response.info.width}x${response.info.height}, `,
fmt`${bold("Worker")}: ${worker.name}, `, fmt`${bold("Worker")}: ${worker.id}, `,
fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`, fmt`${bold("Time taken")}: ${FmtDuration.format(jobDurationMs, { ignoreZero: true })}`,
] ]
: [], : [],
@ -275,7 +282,11 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
}); });
break; break;
} catch (err) { } catch (err) {
logger().warning(`Sending images (attempt ${sendMediaAttempt}) failed: ${err}`); logger().warning(
`Sending images (attempt ${sendMediaAttempt}) for ${
formatUserChat(job.value)
} using ${worker.id} failed: ${err}`,
);
if (sendMediaAttempt >= 6) throw err; if (sendMediaAttempt >= 6) throw err;
// wait 2 * 5 seconds before retrying // wait 2 * 5 seconds before retrying
for (let i = 0; i < 2; i++) { for (let i = 0; i < 2; i++) {
@ -313,7 +324,7 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
})); }));
logger().debug( logger().debug(
`Job finished for ${formatUserChat(job.value)} using ${worker.name}${ `Job finished for ${formatUserChat(job.value)} using ${worker.id}${
sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : "" sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : ""
}`, }`,
); );