forked from pinks/eris
1
0
Fork 0

fix: reuse msgs instead of sending and deleting

This commit is contained in:
pinks 2023-09-12 22:37:41 +02:00
parent 047608e92a
commit c3e5a4d908
5 changed files with 116 additions and 54 deletions

View File

@ -115,8 +115,7 @@ async function img2img(
from: ctx.message.from, from: ctx.message.from,
chat: ctx.message.chat, chat: ctx.message.chat,
requestMessageId: ctx.message.message_id, requestMessageId: ctx.message.message_id,
replyMessageId: replyMessage.message_id, status: { type: "waiting", message: replyMessage },
status: { type: "waiting" },
}); });
logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`); logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`);

View File

@ -96,8 +96,7 @@ async function txt2img(ctx: Context, match: string, includeRepliedTo: boolean):
from: ctx.message.from, from: ctx.message.from,
chat: ctx.message.chat, chat: ctx.message.chat,
requestMessageId: ctx.message.message_id, requestMessageId: ctx.message.message_id,
replyMessageId: replyMessage.message_id, status: { type: "waiting", message: replyMessage },
status: { type: "waiting" },
}); });
logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`); logger().debug(`Job enqueued for ${formatUserChat(ctx.message)}`);

View File

@ -4,16 +4,36 @@ import { db } from "./db.ts";
export interface JobSchema { export interface JobSchema {
task: task:
| { type: "txt2img"; params: Partial<PngInfo> } | {
| { type: "img2img"; params: Partial<PngInfo>; fileId: string }; type: "txt2img";
params: Partial<PngInfo>;
}
| {
type: "img2img";
params: Partial<PngInfo>;
fileId: string;
};
from: GrammyTypes.User; from: GrammyTypes.User;
chat: GrammyTypes.Chat; chat: GrammyTypes.Chat;
requestMessageId: number; requestMessageId: number;
replyMessageId?: number;
status: status:
| { type: "waiting" } | {
| { type: "processing"; progress: number; worker: string; updatedDate: Date } type: "waiting";
| { type: "done"; info?: SdTxt2ImgInfo; startDate?: Date; endDate?: Date }; message?: GrammyTypes.Message.TextMessage;
}
| {
type: "processing";
progress: number;
worker: string;
updatedDate: Date;
message?: GrammyTypes.Message.TextMessage;
}
| {
type: "done";
info?: SdTxt2ImgInfo;
startDate?: Date;
endDate?: Date;
};
} }
export const jobStore = new IKV.Store(db, "job", { export const jobStore = new IKV.Store(db, "job", {

View File

@ -40,10 +40,16 @@ export async function processJobs(): Promise<never> {
if (!worker) continue; if (!worker) continue;
// process the job // process the job
await job.update({ await job.update((value) => ({
status: { type: "processing", progress: 0, worker: worker.name, updatedDate: new Date() }, ...value,
}); status: {
type: "processing",
progress: 0,
worker: worker.name,
updatedDate: new Date(),
message: job.value.status.type !== "done" ? job.value.status.message : undefined,
},
}));
busyWorkers.add(worker.name); busyWorkers.add(worker.name);
processJob(job, worker, config) processJob(job, worker, config)
.catch(async (err) => { .catch(async (err) => {
@ -89,31 +95,61 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
); );
const startDate = new Date(); const startDate = new Date();
// if there is already a status message delete it // if there is already a status message and its older than 10 seconds
if (job.value.replyMessageId) { if (
await bot.api.deleteMessage(job.value.chat.id, job.value.replyMessageId) job.value.status.type === "processing" && job.value.status.message &&
.catch(() => undefined); (Date.now() - job.value.status.message.date * 1000) > 10 * 1000
) {
// delete it
await bot.api.deleteMessage(
job.value.status.message.chat.id,
job.value.status.message.message_id,
).catch(() => undefined);
await job.update((value) => ({
...value,
status: { ...value.status, message: undefined },
}));
} }
// we have to check if job is still processing at every step because TypeScript
if (job.value.status.type === "processing") {
// if now there is no status message
if (!job.value.status.message) {
// send a new status message // send a new status message
const newStatusMessage = await bot.api.sendMessage( const statusMessage = await bot.api.sendMessage(
job.value.chat.id, job.value.chat.id,
`Generating your prompt now... 0% using ${worker.name}`, `Generating your prompt now... 0% using ${worker.name}`,
{ reply_to_message_id: job.value.requestMessageId }, { reply_to_message_id: job.value.requestMessageId },
).catch((err) => { ).catch((err) => {
// don't error if the request message was deleted // if the request message (the message we are replying to) was deleted
if (err instanceof Grammy.GrammyError && err.message.match(/repl(y|ied)/)) return null; if (err instanceof Grammy.GrammyError && err.message.match(/repl(y|ied)/)) {
else throw err; // jest set the status message to undefined
return undefined;
}
throw err;
}); });
// if the request message was deleted, cancel the job await job.update((value) => ({
if (!newStatusMessage) { ...value,
status: { ...value.status, message: statusMessage },
}));
} else {
// edit the existing status message
await bot.api.editMessageText(
job.value.status.message.chat.id,
job.value.status.message.message_id,
`Generating your prompt now... 0% using ${worker.name}`,
{ maxAttempts: 1 },
).catch(() => undefined);
}
}
// if we don't have a status message (it failed sending because request was deleted)
if (job.value.status.type === "processing" && !job.value.status.message) {
// cancel the job
await job.delete(); await job.delete();
logger().info( logger().info(`Job cancelled for ${formatUserChat(job.value)}`);
`Job cancelled for ${formatUserChat(job.value)}`,
);
return; return;
} }
await job.update({ replyMessageId: newStatusMessage.message_id });
// reduce size if worker can't handle the resolution // reduce size if worker can't handle the resolution
const size = limitSize( const size = limitSize(
@ -123,25 +159,26 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
// process the job // process the job
const handleProgress = async (progress: SdProgressResponse) => { const handleProgress = async (progress: SdProgressResponse) => {
// important: don't let any errors escape this callback if (job.value.status.type === "processing" && job.value.status.message) {
if (job.value.replyMessageId) {
await bot.api.editMessageText( await bot.api.editMessageText(
job.value.chat.id, job.value.status.message.chat.id,
job.value.replyMessageId, job.value.status.message.message_id,
`Generating your prompt now... ${ `Generating your prompt now... ${
(progress.progress * 100).toFixed(0) (progress.progress * 100).toFixed(0)
}% using ${worker.name}`, }% using ${worker.name}`,
{ maxAttempts: 1 }, { maxAttempts: 1 },
).catch(() => undefined); ).catch(() => undefined);
} }
await job.update({ await job.update((value) => ({
...value,
status: { status: {
type: "processing", type: "processing",
progress: progress.progress, progress: progress.progress,
worker: worker.name, worker: worker.name,
updatedDate: new Date(), updatedDate: new Date(),
message: value.status.type !== "done" ? value.status.message : undefined,
}, },
}, { maxAttempts: 1 }).catch(() => undefined); }), { maxAttempts: 1 }).catch(() => undefined);
}; };
let response: SdResponse<unknown>; let response: SdResponse<unknown>;
const taskType = job.value.task.type; // don't narrow this to never pls typescript const taskType = job.value.task.type; // don't narrow this to never pls typescript
@ -169,11 +206,11 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
throw new Error(`Unknown task type: ${taskType}`); throw new Error(`Unknown task type: ${taskType}`);
} }
// upload the result // change status message to uploading images
if (job.value.replyMessageId) { if (job.value.status.type === "processing" && job.value.status.message) {
await bot.api.editMessageText( await bot.api.editMessageText(
job.value.chat.id, job.value.status.message.chat.id,
job.value.replyMessageId, job.value.status.message.message_id,
`Uploading your images...`, `Uploading your images...`,
).catch(() => undefined); ).catch(() => undefined);
} }
@ -201,6 +238,7 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
: [], : [],
]); ]);
// sending images loop because telegram is unreliable and it would be a shame to lose the images
let sendMediaAttempt = 0; let sendMediaAttempt = 0;
let resultMessages: GrammyTypes.Message.MediaMessage[] | undefined; let resultMessages: GrammyTypes.Message.MediaMessage[] | undefined;
while (true) { while (true) {
@ -245,17 +283,23 @@ async function processJob(job: IKV.Model<JobSchema>, worker: WorkerData, config:
} }
// delete the status message // delete the status message
if (job.value.replyMessageId) { if (job.value.status.type === "processing" && job.value.status.message) {
await bot.api.deleteMessage(job.value.chat.id, job.value.replyMessageId) await bot.api.deleteMessage(
.catch(() => undefined) job.value.status.message.chat.id,
.then(() => job.update({ replyMessageId: undefined })) job.value.status.message.message_id,
.catch(() => undefined); ).catch(() => undefined);
await job.update((value) => ({
...value,
status: { ...value.status, message: undefined },
}));
} }
// update job to status done // update job to status done
await job.update({ await job.update((value) => ({
...value,
status: { type: "done", info: response.info, startDate, endDate: new Date() }, status: { type: "done", info: response.info, startDate, endDate: new Date() },
}); }));
logger().debug( logger().debug(
`Job finished for ${formatUserChat(job.value)} using ${worker.name}${ `Job finished for ${formatUserChat(job.value)} using ${worker.name}${
sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : "" sendMediaAttempt > 1 ? ` after ${sendMediaAttempt} attempts` : ""

View File

@ -14,10 +14,10 @@ export async function updateJobStatusMsgs(): Promise<never> {
await new Promise((resolve) => setTimeout(resolve, 5000)); await new Promise((resolve) => setTimeout(resolve, 5000));
const jobs = await jobStore.getBy("status.type", "waiting"); const jobs = await jobStore.getBy("status.type", "waiting");
for (const [index, job] of jobs.entries()) { for (const [index, job] of jobs.entries()) {
if (!job.value.replyMessageId) continue; if (job.value.status.type !== "waiting" || !job.value.status.message) continue;
await bot.api.editMessageText( await bot.api.editMessageText(
job.value.chat.id, job.value.status.message.chat.id,
job.value.replyMessageId, job.value.status.message.message_id,
`You are ${formatOrdinal(index + 1)} in queue.`, `You are ${formatOrdinal(index + 1)} in queue.`,
{ maxAttempts: 1 }, { maxAttempts: 1 },
).catch(() => undefined); ).catch(() => undefined);