chore: update to ikv 0.3

This commit is contained in:
pinks 2023-09-18 01:01:09 +02:00
parent 2d2ffb8588
commit 3f27b4470b
8 changed files with 16 additions and 11 deletions

View File

@ -32,7 +32,7 @@ async function img2img(
return;
}
const jobs = await jobStore.getBy("status.type", "waiting");
const jobs = await jobStore.getBy("status.type", { value: "waiting" });
if (jobs.length >= ctx.session.global.maxJobs) {
await ctx.reply(
`The queue is full. Try again later. (Max queue size: ${ctx.session.global.maxJobs})`,

View File

@ -11,9 +11,9 @@ export async function queueCommand(ctx: Grammy.CommandContext<Context>) {
handleFutureUpdates().catch((err) => logger().warning(`Updating queue message failed: ${err}`));
async function getMessageText() {
const processingJobs = await jobStore.getBy("status.type", "processing")
const processingJobs = await jobStore.getBy("status.type", { value: "processing" })
.then((jobs) => jobs.map((job) => ({ ...job.value, place: 0 })));
const waitingJobs = await jobStore.getBy("status.type", "waiting")
const waitingJobs = await jobStore.getBy("status.type", { value: "waiting" })
.then((jobs) => jobs.map((job, index) => ({ ...job.value, place: index + 1 })));
const jobs = [...processingJobs, ...waitingJobs];
const { bold } = GrammyParseMode;

View File

@ -27,7 +27,7 @@ async function txt2img(ctx: Context, match: string, includeRepliedTo: boolean):
return;
}
const jobs = await jobStore.getBy("status.type", "waiting");
const jobs = await jobStore.getBy("status.type", { value: "waiting" });
if (jobs.length >= ctx.session.global.maxJobs) {
await ctx.reply(
`The queue is full. Try again later. (Max queue size: ${ctx.session.global.maxJobs})`,

View File

@ -38,7 +38,12 @@ export interface JobSchema {
};
}
export const jobStore = new IKV.Store(db, "job", {
schema: new IKV.Schema<JobSchema>(),
indices: ["status.type"],
type JobIndices = {
"status.type": JobSchema["status"]["type"];
};
export const jobStore = new IKV.Store<JobSchema, JobIndices>(db, "job", {
indices: {
"status.type": { getValue: (job) => job.status.type },
},
});

View File

@ -5,7 +5,7 @@ export * as Collections from "https://deno.land/std@0.201.0/collections/mod.ts";
export * as Base64 from "https://deno.land/std@0.201.0/encoding/base64.ts";
export * as AsyncX from "https://deno.land/x/async@v2.0.2/mod.ts";
export * as ULID from "https://deno.land/x/ulid@v0.3.0/mod.ts";
export * as IKV from "https://deno.land/x/indexed_kv@v0.2.0/mod.ts";
export * as IKV from "https://deno.land/x/indexed_kv@v0.3.0/mod.ts";
export * as Grammy from "https://deno.land/x/grammy@v1.18.1/mod.ts";
export * as GrammyTypes from "https://deno.land/x/grammy_types@v3.2.0/mod.ts";
export * as GrammyAutoQuote from "https://deno.land/x/grammy_autoquote@v1.1.2/mod.ts";

View File

@ -33,7 +33,7 @@ export async function processJobs(): Promise<never> {
await new Promise((resolve) => setTimeout(resolve, 1000));
try {
const jobs = await jobStore.getBy("status.type", "waiting");
const jobs = await jobStore.getBy("status.type", { value: "waiting" });
// get first waiting job which hasn't errored in last minute
const job = jobs.find((job) =>
job.value.status.type === "waiting" &&

View File

@ -11,7 +11,7 @@ export async function returnHangedJobs(): Promise<never> {
while (true) {
try {
await new Promise((resolve) => setTimeout(resolve, 5000));
const jobs = await jobStore.getBy("status.type", "processing");
const jobs = await jobStore.getBy("status.type", { value: "processing" });
for (const job of jobs) {
if (job.value.status.type !== "processing") continue;
// if job wasn't updated for 2 minutes, return it to the queue

View File

@ -12,7 +12,7 @@ export async function updateJobStatusMsgs(): Promise<never> {
while (true) {
try {
await new Promise((resolve) => setTimeout(resolve, 5000));
const jobs = await jobStore.getBy("status.type", "waiting");
const jobs = await jobStore.getBy("status.type", { value: "waiting" });
for (const [index, job] of jobs.entries()) {
if (job.value.status.type !== "waiting" || !job.value.status.message) continue;
await bot.api.editMessageText(