From ba2afe40ce265892c49b9dfe39023f112e62b4f2 Mon Sep 17 00:00:00 2001 From: pinks Date: Thu, 7 Sep 2023 22:43:40 +0200 Subject: [PATCH] implement indexes in store to keep whole history --- bot.ts | 19 ++-- deps.ts | 1 + main.ts | 15 +++ queue.ts | 66 ++++++++----- sd.ts | 12 +-- session.ts | 4 +- store.test.ts | 96 +++++++++++++++++++ store.ts | 255 +++++++++++++++++++++++++++++++++++++++++--------- 8 files changed, 380 insertions(+), 88 deletions(-) create mode 100644 store.test.ts diff --git a/bot.ts b/bot.ts index 7582a89..625ffe6 100644 --- a/bot.ts +++ b/bot.ts @@ -1,8 +1,10 @@ -import { autoQuote, bold, Bot, Context, hydrateReply, ParseModeFlavor } from "./deps.ts"; +import { autoQuote, bold, Bot, Context, hydrateReply, log, ParseModeFlavor } from "./deps.ts"; import { fmt } from "./intl.ts"; import { getAllJobs, pushJob } from "./queue.ts"; import { mySession, MySessionFlavor } from "./session.ts"; +const logger = () => log.getLogger(); + export type MyContext = ParseModeFlavor & MySessionFlavor; export const bot = new Bot(Deno.env.get("TG_BOT_TOKEN") ?? ""); bot.use(autoQuote); @@ -39,7 +41,8 @@ bot.use(async (ctx, next) => { bot.api.setMyShortDescription("I can generate furry images from text"); bot.api.setMyDescription( - "I can generate furry images from text. Send /txt2img to generate an image.", + "I can generate furry images from text. " + + "Send /txt2img to generate an image.", ); bot.api.setMyCommands([ { command: "txt2img", description: "Generate an image" }, @@ -72,18 +75,16 @@ bot.command("txt2img", async (ctx) => { if (!ctx.match) { return ctx.reply("Please describe what you want to see after the command"); } - pushJob({ + const statusMessage = await ctx.reply("Accepted. You are now in queue."); + await pushJob({ params: { prompt: ctx.match }, user: ctx.from, chat: ctx.chat, requestMessage: ctx.message, + statusMessage, status: { type: "idle" }, }); - console.log( - `Enqueued job ${jobs.length + 1} for ${ctx.from.first_name} in ${ctx.chat.type} chat:`, - ctx.match.replace(/\s+/g, " "), - "\n", - ); + logger().info("Job enqueued", ctx.from.first_name, ctx.chat.type, ctx.match.replace(/\s+/g, " ")); }); bot.command("queue", async (ctx) => { @@ -268,5 +269,5 @@ bot.catch((err) => { msg += ` in ${chat.title}`; if (chat.type === "supergroup" && chat.username) msg += ` (@${chat.username})`; } - console.error(msg, err.error); + logger().error("handling update failed", from?.first_name, chat?.type, err); }); diff --git a/deps.ts b/deps.ts index 749ff5c..03320d9 100644 --- a/deps.ts +++ b/deps.ts @@ -4,3 +4,4 @@ export * from "https://deno.land/x/grammy_parse_mode@1.7.1/mod.ts"; export * from "https://deno.land/x/grammy_storages@v2.3.1/denokv/src/mod.ts"; export * as types from "https://deno.land/x/grammy_types@v3.2.0/mod.ts"; export * from "https://deno.land/x/ulid@v0.3.0/mod.ts"; +export * as log from "https://deno.land/std@0.201.0/log/mod.ts"; diff --git a/main.ts b/main.ts index c3827e7..55df7d3 100644 --- a/main.ts +++ b/main.ts @@ -1,6 +1,21 @@ import "https://deno.land/std@0.201.0/dotenv/load.ts"; import { bot } from "./bot.ts"; import { processQueue, returnHangedJobs } from "./queue.ts"; +import { log } from "./deps.ts"; + +log.setup({ + handlers: { + console: new log.handlers.ConsoleHandler("INFO", { + formatter: (record) => + `[${record.levelName}] ${record.msg} ${ + record.args.map((arg) => JSON.stringify(arg)).join(" ") + } (${record.datetime.toISOString()})`, + }), + }, + loggers: { + default: { level: "INFO", handlers: ["console"] }, + }, +}); await Promise.all([ bot.start(), diff --git a/queue.ts b/queue.ts index bd514d6..04d9c61 100644 --- a/queue.ts +++ b/queue.ts @@ -1,30 +1,39 @@ -import { InputFile, InputMediaBuilder, types } from "./deps.ts"; +import { InputFile, InputMediaBuilder, log, types } from "./deps.ts"; import { bot } from "./bot.ts"; import { getGlobalSession } from "./session.ts"; import { formatOrdinal } from "./intl.ts"; -import { SdRequest, txt2img } from "./sd.ts"; +import { SdTxt2ImgRequest, SdTxt2ImgResponse, txt2img } from "./sd.ts"; import { extFromMimeType, mimeTypeFromBase64 } from "./mimeType.ts"; -import { Model, Store } from "./store.ts"; +import { Model, Schema, Store } from "./store.ts"; + +const logger = () => log.getLogger(); interface Job { - params: Partial; + params: Partial; user: types.User; chat: types.Chat.PrivateChat | types.Chat.GroupChat | types.Chat.SupergroupChat; requestMessage: types.Message & types.Message.TextMessage; statusMessage?: types.Message & types.Message.TextMessage; - status: { type: "idle" } | { type: "processing"; progress: number; updatedDate: Date }; + status: + | { type: "idle" } + | { type: "processing"; progress: number; updatedDate: Date }; } const db = await Deno.openKv("./app.db"); -const jobStore = new Store(db, "job"); +const jobStore = new Store(db, "job", { + schema: new Schema(), + indices: ["status.type", "user.id", "chat.id"], +}); + +jobStore.getBy("user.id", 123).then(() => {}); export async function pushJob(job: Job) { await jobStore.create(job); } async function takeJob(): Promise | null> { - const jobs = await jobStore.list(); + const jobs = await jobStore.getAll(); const job = jobs.find((job) => job.value.status.type === "idle"); if (!job) return null; await job.update({ status: { type: "processing", progress: 0, updatedDate: new Date() } }); @@ -32,18 +41,20 @@ async function takeJob(): Promise | null> { } export async function getAllJobs(): Promise> { - return await jobStore.list().then((jobs) => jobs.map((job) => job.value)); + return await jobStore.getAll().then((jobs) => jobs.map((job) => job.value)); } export async function processQueue() { while (true) { - const job = await takeJob(); + const job = await takeJob().catch((err) => + void logger().warning("failed getting job", err.message) + ); if (!job) { await new Promise((resolve) => setTimeout(resolve, 1000)); continue; } let place = 0; - for (const job of await jobStore.list()) { + for (const job of await jobStore.getAll().catch(() => [])) { if (job.value.status.type === "idle") place += 1; if (place === 0) continue; const statusMessageText = `You are ${formatOrdinal(place)} in queue.`; @@ -51,7 +62,7 @@ export async function processQueue() { await bot.api.sendMessage(job.value.chat.id, statusMessageText, { reply_to_message_id: job.value.requestMessage.message_id, }).catch(() => undefined) - .then((message) => job.update({ statusMessage: message })); + .then((message) => job.update({ statusMessage: message })).catch(() => undefined); } else { await bot.api.editMessageText( job.value.chat.id, @@ -96,9 +107,10 @@ export async function processQueue() { } }, ); - console.log( - `Finished job for ${job.value.user.first_name} in ${job.value.chat.type} chat`, - ); + const jobCount = (await jobStore.getAll()).filter((job) => + job.value.status.type !== "processing" + ).length; + logger().info("Job finished", job.value.user.first_name, job.value.chat.type, { jobCount }); if (job.value.statusMessage) { await bot.api.editMessageText( job.value.chat.id, @@ -126,9 +138,7 @@ export async function processQueue() { }); await job.delete(); } catch (err) { - console.error( - `Failed to generate an image for ${job.value.user.first_name} in ${job.value.chat.type} chat: ${err}`, - ); + logger().error("Job failed", job.value.user.first_name, job.value.chat.type, err); const errorMessage = await bot.api .sendMessage(job.value.chat.id, err.toString(), { reply_to_message_id: job.value.requestMessage.message_id, @@ -138,12 +148,16 @@ export async function processQueue() { if (job.value.statusMessage) { await bot.api .deleteMessage(job.value.chat.id, job.value.statusMessage.message_id) - .catch(() => undefined) - .then(() => job.update({ statusMessage: undefined })); + .then(() => job.update({ statusMessage: undefined })) + .catch(() => void logger().warning("failed deleting status message", err.message)); } - job.update({ status: { type: "idle" } }); + await job.update({ status: { type: "idle" } }).catch((err) => + void logger().warning("failed returning job", err.message) + ); } else { - await job.delete(); + await job.delete().catch((err) => + void logger().warning("failed deleting job", err.message) + ); } } } @@ -152,15 +166,15 @@ export async function processQueue() { export async function returnHangedJobs() { while (true) { await new Promise((resolve) => setTimeout(resolve, 5000)); - const jobs = await jobStore.list(); + const jobs = await jobStore.getAll().catch(() => []); for (const job of jobs) { - if (job.value.status.type === "idle") continue; + if (job.value.status.type !== "processing") continue; // if job wasn't updated for 1 minute, return it to the queue if (job.value.status.updatedDate.getTime() < Date.now() - 60 * 1000) { - console.log( - `Returning hanged job for ${job.value.user.first_name} in ${job.value.chat.type} chat`, + logger().warning("Hanged job returned", job.value.user.first_name, job.value.chat.type); + await job.update({ status: { type: "idle" } }).catch((err) => + void logger().warning("failed returning job", err.message) ); - await job.update({ status: { type: "idle" } }); } } } diff --git a/sd.ts b/sd.ts index 01cf0f5..ff1ce30 100644 --- a/sd.ts +++ b/sd.ts @@ -1,9 +1,9 @@ export async function txt2img( apiUrl: string, - params: Partial, + params: Partial, onProgress?: (progress: SdProgressResponse) => void, signal?: AbortSignal, -): Promise { +): Promise { let response: Response | undefined; let error: unknown; @@ -26,7 +26,7 @@ export async function txt2img( } if (response != null) { if (response.ok) { - const result = (await response.json()) as SdResponse; + const result = (await response.json()) as SdTxt2ImgResponse; return result; } else { throw new Error(`Request failed: ${response.status} ${response.statusText}`); @@ -44,7 +44,7 @@ export async function txt2img( } } -export interface SdRequest { +export interface SdTxt2ImgRequest { denoising_strength: number; prompt: string; seed: number; @@ -60,9 +60,9 @@ export interface SdRequest { save_images: boolean; } -export interface SdResponse { +export interface SdTxt2ImgResponse { images: string[]; - parameters: SdRequest; + parameters: SdTxt2ImgRequest; /** Contains serialized JSON */ info: string; } diff --git a/session.ts b/session.ts index 2223d8f..f34ef2a 100644 --- a/session.ts +++ b/session.ts @@ -1,5 +1,5 @@ import { Context, DenoKVAdapter, session, SessionFlavor } from "./deps.ts"; -import { SdRequest } from "./sd.ts"; +import { SdTxt2ImgRequest } from "./sd.ts"; export type MySessionFlavor = SessionFlavor; @@ -15,7 +15,7 @@ export interface GlobalData { sdApiUrl: string; maxUserJobs: number; maxJobs: number; - defaultParams?: Partial; + defaultParams?: Partial; } export interface ChatData { diff --git a/store.test.ts b/store.test.ts new file mode 100644 index 0000000..64a09f6 --- /dev/null +++ b/store.test.ts @@ -0,0 +1,96 @@ +import { assert } from "https://deno.land/std@0.198.0/assert/assert.ts"; +import { Schema, Store } from "./store.ts"; +import { log } from "./deps.ts"; + +const db = await Deno.openKv(); + +log.setup({ + handlers: { + console: new log.handlers.ConsoleHandler("DEBUG", {}), + }, + loggers: { + kvStore: { level: "DEBUG", handlers: ["console"] }, + }, +}); + +interface PointSchema { + x: number; + y: number; +} + +interface JobSchema { + name: string; + params: { + a: number; + b: number | null; + }; + status: { type: "idle" } | { type: "processing"; progress: number } | { type: "done" }; + lastUpdateDate: Date; +} + +const pointStore = new Store(db, "points", { + schema: new Schema(), + indices: ["x", "y"], +}); +const jobStore = new Store(db, "jobs", { + schema: new Schema(), + indices: ["name", "status.type"], +}); + +Deno.test("create and delete", async () => { + await pointStore.deleteAll(); + const point1 = await pointStore.create({ x: 1, y: 2 }); + const point2 = await pointStore.create({ x: 3, y: 4 }); + assert((await pointStore.getAll()).length === 2); + const point3 = await pointStore.create({ x: 5, y: 6 }); + assert((await pointStore.getAll()).length === 3); + assert((await pointStore.get(point2.id))?.value.y === 4); + await point1.delete(); + assert((await pointStore.getAll()).length === 2); + await point2.delete(); + await point3.delete(); + assert((await pointStore.getAll()).length === 0); +}); + +Deno.test("list by index", async () => { + await jobStore.deleteAll(); + + const test = await jobStore.create({ + name: "test", + params: { a: 1, b: null }, + status: { type: "idle" }, + lastUpdateDate: new Date(), + }); + assert((await jobStore.getBy("name", "test"))[0].value.params.a === 1); + assert((await jobStore.getBy("status.type", "idle"))[0].value.params.a === 1); + + await test.update({ status: { type: "processing", progress: 33 } }); + assert((await jobStore.getBy("status.type", "processing"))[0].value.params.a === 1); + + await test.update({ status: { type: "done" } }); + assert((await jobStore.getBy("status.type", "done"))[0].value.params.a === 1); + assert((await jobStore.getBy("status.type", "processing")).length === 0); + + await test.delete(); + assert((await jobStore.getBy("status.type", "done")).length === 0); + assert((await jobStore.getBy("name", "test")).length === 0); +}); + +Deno.test("fail on concurrent update", async () => { + await jobStore.deleteAll(); + + const test = await jobStore.create({ + name: "test", + params: { a: 1, b: null }, + status: { type: "idle" }, + lastUpdateDate: new Date(), + }); + + const result = await Promise.all([ + test.update({ status: { type: "processing", progress: 33 } }), + test.update({ status: { type: "done" } }), + ]).catch(() => true); + assert(result === true); + + await test.delete(); +}); diff --git a/store.ts b/store.ts index 2ac7765..6faf668 100644 --- a/store.ts +++ b/store.ts @@ -1,76 +1,241 @@ -import { ulid } from "./deps.ts"; +import { log, ulid } from "./deps.ts"; -export class Store { - constructor( - private readonly db: Deno.Kv, - private readonly storeKey: Deno.KvKeyPart, - ) { +const logger = () => log.getLogger("kvStore"); + +export type validIndexKey = { + [K in keyof T]: K extends string ? (T[K] extends Deno.KvKeyPart ? K + : T[K] extends readonly unknown[] ? never + : T[K] extends object ? `${K}.${validIndexKey}` + : never) + : never; +}[keyof T]; + +export type indexValue> = I extends `${infer K}.${infer Rest}` + ? K extends keyof T ? Rest extends validIndexKey ? indexValue + : never + : never + : I extends keyof T ? T[I] + : never; + +export class Schema {} + +interface StoreOptions { + readonly schema: Schema; + readonly indices: readonly I[]; +} + +export class Store> { + readonly #db: Deno.Kv; + readonly #key: Deno.KvKeyPart; + readonly #indices: readonly I[]; + + constructor(db: Deno.Kv, key: Deno.KvKeyPart, options: StoreOptions) { + this.#db = db; + this.#key = key; + this.#indices = options.indices; } async create(value: T): Promise> { const id = ulid(); - await this.db.set([this.storeKey, id], value); - return new Model(this.db, this.storeKey, id, value); + await this.#db.set([this.#key, "id", id], value); + logger().debug(["created", this.#key, "id", id].join(" ")); + for (const index of this.#indices) { + const indexValue: Deno.KvKeyPart = index + .split(".") + .reduce((value, key) => value[key], value as any); + await this.#db.set([this.#key, index, indexValue, id], value); + logger().debug(["created", this.#key, index, indexValue, id].join(" ")); + } + return new Model(this.#db, this.#key, this.#indices, id, value); } async get(id: Deno.KvKeyPart): Promise | null> { - const entry = await this.db.get([this.storeKey, id]); + const entry = await this.#db.get([this.#key, "id", id]); if (entry.versionstamp == null) return null; - return new Model(this.db, this.storeKey, id, entry.value); + return new Model(this.#db, this.#key, this.#indices, id, entry.value); } - async list(): Promise>> { - const models: Array> = []; - for await (const entry of this.db.list({ prefix: [this.storeKey] })) { - models.push(new Model(this.db, this.storeKey, entry.key[1], entry.value)); + async getBy( + index: J, + value: indexValue, + options?: Deno.KvListOptions, + ): Promise>> { + const models: Model[] = []; + for await ( + const entry of this.#db.list( + { prefix: [this.#key, index, value as Deno.KvKeyPart] }, + options, + ) + ) { + models.push(new Model(this.#db, this.#key, this.#indices, entry.key[3], entry.value)); } return models; } + + async getAll( + opts?: { limit?: number; reverse?: boolean }, + ): Promise>> { + const { limit, reverse } = opts ?? {}; + const models: Array> = []; + for await ( + const entry of this.#db.list({ + prefix: [this.#key, "id"], + }, { limit, reverse }) + ) { + models.push(new Model(this.#db, this.#key, this.#indices, entry.key[2], entry.value)); + } + return models; + } + + async deleteAll(): Promise { + for await (const entry of this.#db.list({ prefix: [this.#key] })) { + await this.#db.delete(entry.key); + logger().debug(["deleted", ...entry.key].join(" ")); + } + } } -export class Model { - #value: T; +export class Model { + readonly #db: Deno.Kv; + readonly #key: Deno.KvKeyPart; + readonly #indices: readonly string[]; + readonly #id: Deno.KvKeyPart; + value: T; constructor( - private readonly db: Deno.Kv, - private readonly storeKey: Deno.KvKeyPart, - private readonly entryKey: Deno.KvKeyPart, + db: Deno.Kv, + key: Deno.KvKeyPart, + indices: readonly string[], + id: Deno.KvKeyPart, value: T, ) { - this.#value = value; + this.#db = db; + this.#key = key; + this.#indices = indices; + this.#id = id; + this.value = value; } - get value(): T { - return this.#value; + get id(): Deno.KvKeyPart { + return this.#id; } - async get(): Promise { - const entry = await this.db.get([this.storeKey, this.entryKey]); - if (entry.versionstamp == null) return null; - this.#value = entry.value; - return entry.value; - } + async update(updater: Partial | ((value: T) => T)): Promise { + // get current main entry + const oldEntry = await this.#db.get([this.#key, "id", this.#id]); - async set(value: T): Promise { - await this.db.set([this.storeKey, this.entryKey], value); - this.#value = value; - return value; - } - - async update(value: Partial | ((value: T) => T)): Promise { - const entry = await this.db.get([this.storeKey, this.entryKey]); - if (entry.versionstamp == null) return null; - if (typeof value === "function") { - entry.value = value(entry.value); - } else { - entry.value = { ...entry.value, ...value }; + // get all current index entries + const oldIndexEntries: Record> = {}; + for (const index of this.#indices) { + const indexKey: Deno.KvKeyPart = index + .split(".") + .reduce((value, key) => value[key], oldEntry.value as any); + oldIndexEntries[index] = await this.#db.get([this.#key, index, indexKey, this.#id]); } - await this.db.set([this.storeKey, this.entryKey], entry.value); - this.#value = entry.value; - return entry.value; + + // compute new value + if (typeof updater === "function") { + this.value = updater(this.value); + } else { + this.value = { ...this.value, ...updater }; + } + + // begin transaction + const transaction = this.#db.atomic(); + + // set the main entry + transaction + .check(oldEntry) + .set([this.#key, "id", this.#id], this.value); + logger().debug(["updated", this.#key, "id", this.#id].join(" ")); + + // delete and create all changed index entries + for (const index of this.#indices) { + const oldIndexKey: Deno.KvKeyPart = index + .split(".") + .reduce((value, key) => value[key], oldIndexEntries[index].value as any); + const newIndexKey: Deno.KvKeyPart = index + .split(".") + .reduce((value, key) => value[key], this.value as any); + if (newIndexKey !== oldIndexKey) { + transaction + .check(oldIndexEntries[index]) + .delete([this.#key, index, oldIndexKey, this.#id]) + .set([this.#key, index, newIndexKey, this.#id], this.value); + logger().debug(["deleted", this.#key, index, oldIndexKey, this.#id].join(" ")); + logger().debug(["created", this.#key, index, newIndexKey, this.#id].join(" ")); + } + } + + // commit + const result = await transaction.commit(); + if (!result.ok) throw new Error(`Failed to update ${this.#key} ${this.#id}`); + return this.value; } async delete(): Promise { - await this.db.delete([this.storeKey, this.entryKey]); + // get current main entry + const entry = await this.#db.get([this.#key, "id", this.#id]); + + // begin transaction + const transaction = this.#db.atomic(); + + // delete main entry + transaction + .check(entry) + .delete([this.#key, "id", this.#id]); + logger().debug(["deleted", this.#key, "id", this.#id].join(" ")); + + // delete all index entries + for (const index of this.#indices) { + const indexKey: Deno.KvKeyPart = index + .split(".") + .reduce((value, key) => value[key], entry.value as any); + transaction + .delete([this.#key, index, indexKey, this.#id]); + logger().debug(["deleted", this.#key, index, indexKey, this.#id].join(" ")); + } + + // commit + const result = await transaction.commit(); + if (!result.ok) throw new Error(`Failed to delete ${this.#key} ${this.#id}`); } } + +export async function retry( + fn: () => Promise, + options: { maxAttempts?: number; delayMs?: number } = {}, +): Promise { + const { maxAttempts = 3, delayMs = 1000 } = options; + let error: unknown; + for (let attempt = 0; attempt < maxAttempts; attempt++) { + try { + return await fn(); + } catch (err) { + error = err; + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + throw error; +} + +export async function collectIterator( + iterator: AsyncIterableIterator, + options: { maxItems?: number; timeoutMs?: number } = {}, +): Promise { + const { maxItems = 1000, timeoutMs = 2000 } = options; + const result: T[] = []; + const timeout = setTimeout(() => iterator.return?.(), timeoutMs); + try { + for await (const item of iterator) { + result.push(item); + if (result.length >= maxItems) { + iterator.return?.(); + break; + } + } + } finally { + clearTimeout(timeout); + } + return result; +}