forked from pinks/eris
1
0
Fork 0

implement indexes in store to keep whole history

This commit is contained in:
pinks 2023-09-07 22:43:40 +02:00
parent 1c55ae70af
commit ba2afe40ce
8 changed files with 380 additions and 88 deletions

19
bot.ts
View File

@ -1,8 +1,10 @@
import { autoQuote, bold, Bot, Context, hydrateReply, ParseModeFlavor } from "./deps.ts";
import { autoQuote, bold, Bot, Context, hydrateReply, log, ParseModeFlavor } from "./deps.ts";
import { fmt } from "./intl.ts";
import { getAllJobs, pushJob } from "./queue.ts";
import { mySession, MySessionFlavor } from "./session.ts";
const logger = () => log.getLogger();
export type MyContext = ParseModeFlavor<Context> & MySessionFlavor;
export const bot = new Bot<MyContext>(Deno.env.get("TG_BOT_TOKEN") ?? "");
bot.use(autoQuote);
@ -39,7 +41,8 @@ bot.use(async (ctx, next) => {
bot.api.setMyShortDescription("I can generate furry images from text");
bot.api.setMyDescription(
"I can generate furry images from text. Send /txt2img to generate an image.",
"I can generate furry images from text. " +
"Send /txt2img to generate an image.",
);
bot.api.setMyCommands([
{ command: "txt2img", description: "Generate an image" },
@ -72,18 +75,16 @@ bot.command("txt2img", async (ctx) => {
if (!ctx.match) {
return ctx.reply("Please describe what you want to see after the command");
}
pushJob({
const statusMessage = await ctx.reply("Accepted. You are now in queue.");
await pushJob({
params: { prompt: ctx.match },
user: ctx.from,
chat: ctx.chat,
requestMessage: ctx.message,
statusMessage,
status: { type: "idle" },
});
console.log(
`Enqueued job ${jobs.length + 1} for ${ctx.from.first_name} in ${ctx.chat.type} chat:`,
ctx.match.replace(/\s+/g, " "),
"\n",
);
logger().info("Job enqueued", ctx.from.first_name, ctx.chat.type, ctx.match.replace(/\s+/g, " "));
});
bot.command("queue", async (ctx) => {
@ -268,5 +269,5 @@ bot.catch((err) => {
msg += ` in ${chat.title}`;
if (chat.type === "supergroup" && chat.username) msg += ` (@${chat.username})`;
}
console.error(msg, err.error);
logger().error("handling update failed", from?.first_name, chat?.type, err);
});

View File

@ -4,3 +4,4 @@ export * from "https://deno.land/x/grammy_parse_mode@1.7.1/mod.ts";
export * from "https://deno.land/x/grammy_storages@v2.3.1/denokv/src/mod.ts";
export * as types from "https://deno.land/x/grammy_types@v3.2.0/mod.ts";
export * from "https://deno.land/x/ulid@v0.3.0/mod.ts";
export * as log from "https://deno.land/std@0.201.0/log/mod.ts";

15
main.ts
View File

@ -1,6 +1,21 @@
import "https://deno.land/std@0.201.0/dotenv/load.ts";
import { bot } from "./bot.ts";
import { processQueue, returnHangedJobs } from "./queue.ts";
import { log } from "./deps.ts";
log.setup({
handlers: {
console: new log.handlers.ConsoleHandler("INFO", {
formatter: (record) =>
`[${record.levelName}] ${record.msg} ${
record.args.map((arg) => JSON.stringify(arg)).join(" ")
} (${record.datetime.toISOString()})`,
}),
},
loggers: {
default: { level: "INFO", handlers: ["console"] },
},
});
await Promise.all([
bot.start(),

View File

@ -1,30 +1,39 @@
import { InputFile, InputMediaBuilder, types } from "./deps.ts";
import { InputFile, InputMediaBuilder, log, types } from "./deps.ts";
import { bot } from "./bot.ts";
import { getGlobalSession } from "./session.ts";
import { formatOrdinal } from "./intl.ts";
import { SdRequest, txt2img } from "./sd.ts";
import { SdTxt2ImgRequest, SdTxt2ImgResponse, txt2img } from "./sd.ts";
import { extFromMimeType, mimeTypeFromBase64 } from "./mimeType.ts";
import { Model, Store } from "./store.ts";
import { Model, Schema, Store } from "./store.ts";
const logger = () => log.getLogger();
interface Job {
params: Partial<SdRequest>;
params: Partial<SdTxt2ImgRequest>;
user: types.User;
chat: types.Chat.PrivateChat | types.Chat.GroupChat | types.Chat.SupergroupChat;
requestMessage: types.Message & types.Message.TextMessage;
statusMessage?: types.Message & types.Message.TextMessage;
status: { type: "idle" } | { type: "processing"; progress: number; updatedDate: Date };
status:
| { type: "idle" }
| { type: "processing"; progress: number; updatedDate: Date };
}
const db = await Deno.openKv("./app.db");
const jobStore = new Store<Job>(db, "job");
const jobStore = new Store(db, "job", {
schema: new Schema<Job>(),
indices: ["status.type", "user.id", "chat.id"],
});
jobStore.getBy("user.id", 123).then(() => {});
export async function pushJob(job: Job) {
await jobStore.create(job);
}
async function takeJob(): Promise<Model<Job> | null> {
const jobs = await jobStore.list();
const jobs = await jobStore.getAll();
const job = jobs.find((job) => job.value.status.type === "idle");
if (!job) return null;
await job.update({ status: { type: "processing", progress: 0, updatedDate: new Date() } });
@ -32,18 +41,20 @@ async function takeJob(): Promise<Model<Job> | null> {
}
export async function getAllJobs(): Promise<Array<Job>> {
return await jobStore.list().then((jobs) => jobs.map((job) => job.value));
return await jobStore.getAll().then((jobs) => jobs.map((job) => job.value));
}
export async function processQueue() {
while (true) {
const job = await takeJob();
const job = await takeJob().catch((err) =>
void logger().warning("failed getting job", err.message)
);
if (!job) {
await new Promise((resolve) => setTimeout(resolve, 1000));
continue;
}
let place = 0;
for (const job of await jobStore.list()) {
for (const job of await jobStore.getAll().catch(() => [])) {
if (job.value.status.type === "idle") place += 1;
if (place === 0) continue;
const statusMessageText = `You are ${formatOrdinal(place)} in queue.`;
@ -51,7 +62,7 @@ export async function processQueue() {
await bot.api.sendMessage(job.value.chat.id, statusMessageText, {
reply_to_message_id: job.value.requestMessage.message_id,
}).catch(() => undefined)
.then((message) => job.update({ statusMessage: message }));
.then((message) => job.update({ statusMessage: message })).catch(() => undefined);
} else {
await bot.api.editMessageText(
job.value.chat.id,
@ -96,9 +107,10 @@ export async function processQueue() {
}
},
);
console.log(
`Finished job for ${job.value.user.first_name} in ${job.value.chat.type} chat`,
);
const jobCount = (await jobStore.getAll()).filter((job) =>
job.value.status.type !== "processing"
).length;
logger().info("Job finished", job.value.user.first_name, job.value.chat.type, { jobCount });
if (job.value.statusMessage) {
await bot.api.editMessageText(
job.value.chat.id,
@ -126,9 +138,7 @@ export async function processQueue() {
});
await job.delete();
} catch (err) {
console.error(
`Failed to generate an image for ${job.value.user.first_name} in ${job.value.chat.type} chat: ${err}`,
);
logger().error("Job failed", job.value.user.first_name, job.value.chat.type, err);
const errorMessage = await bot.api
.sendMessage(job.value.chat.id, err.toString(), {
reply_to_message_id: job.value.requestMessage.message_id,
@ -138,12 +148,16 @@ export async function processQueue() {
if (job.value.statusMessage) {
await bot.api
.deleteMessage(job.value.chat.id, job.value.statusMessage.message_id)
.catch(() => undefined)
.then(() => job.update({ statusMessage: undefined }));
.then(() => job.update({ statusMessage: undefined }))
.catch(() => void logger().warning("failed deleting status message", err.message));
}
job.update({ status: { type: "idle" } });
await job.update({ status: { type: "idle" } }).catch((err) =>
void logger().warning("failed returning job", err.message)
);
} else {
await job.delete();
await job.delete().catch((err) =>
void logger().warning("failed deleting job", err.message)
);
}
}
}
@ -152,15 +166,15 @@ export async function processQueue() {
export async function returnHangedJobs() {
while (true) {
await new Promise((resolve) => setTimeout(resolve, 5000));
const jobs = await jobStore.list();
const jobs = await jobStore.getAll().catch(() => []);
for (const job of jobs) {
if (job.value.status.type === "idle") continue;
if (job.value.status.type !== "processing") continue;
// if job wasn't updated for 1 minute, return it to the queue
if (job.value.status.updatedDate.getTime() < Date.now() - 60 * 1000) {
console.log(
`Returning hanged job for ${job.value.user.first_name} in ${job.value.chat.type} chat`,
logger().warning("Hanged job returned", job.value.user.first_name, job.value.chat.type);
await job.update({ status: { type: "idle" } }).catch((err) =>
void logger().warning("failed returning job", err.message)
);
await job.update({ status: { type: "idle" } });
}
}
}

12
sd.ts
View File

@ -1,9 +1,9 @@
export async function txt2img(
apiUrl: string,
params: Partial<SdRequest>,
params: Partial<SdTxt2ImgRequest>,
onProgress?: (progress: SdProgressResponse) => void,
signal?: AbortSignal,
): Promise<SdResponse> {
): Promise<SdTxt2ImgResponse> {
let response: Response | undefined;
let error: unknown;
@ -26,7 +26,7 @@ export async function txt2img(
}
if (response != null) {
if (response.ok) {
const result = (await response.json()) as SdResponse;
const result = (await response.json()) as SdTxt2ImgResponse;
return result;
} else {
throw new Error(`Request failed: ${response.status} ${response.statusText}`);
@ -44,7 +44,7 @@ export async function txt2img(
}
}
export interface SdRequest {
export interface SdTxt2ImgRequest {
denoising_strength: number;
prompt: string;
seed: number;
@ -60,9 +60,9 @@ export interface SdRequest {
save_images: boolean;
}
export interface SdResponse {
export interface SdTxt2ImgResponse {
images: string[];
parameters: SdRequest;
parameters: SdTxt2ImgRequest;
/** Contains serialized JSON */
info: string;
}

View File

@ -1,5 +1,5 @@
import { Context, DenoKVAdapter, session, SessionFlavor } from "./deps.ts";
import { SdRequest } from "./sd.ts";
import { SdTxt2ImgRequest } from "./sd.ts";
export type MySessionFlavor = SessionFlavor<SessionData>;
@ -15,7 +15,7 @@ export interface GlobalData {
sdApiUrl: string;
maxUserJobs: number;
maxJobs: number;
defaultParams?: Partial<SdRequest>;
defaultParams?: Partial<SdTxt2ImgRequest>;
}
export interface ChatData {

96
store.test.ts Normal file
View File

@ -0,0 +1,96 @@
import { assert } from "https://deno.land/std@0.198.0/assert/assert.ts";
import { Schema, Store } from "./store.ts";
import { log } from "./deps.ts";
const db = await Deno.openKv();
log.setup({
handlers: {
console: new log.handlers.ConsoleHandler("DEBUG", {}),
},
loggers: {
kvStore: { level: "DEBUG", handlers: ["console"] },
},
});
interface PointSchema {
x: number;
y: number;
}
interface JobSchema {
name: string;
params: {
a: number;
b: number | null;
};
status: { type: "idle" } | { type: "processing"; progress: number } | { type: "done" };
lastUpdateDate: Date;
}
const pointStore = new Store(db, "points", {
schema: new Schema<PointSchema>(),
indices: ["x", "y"],
});
const jobStore = new Store(db, "jobs", {
schema: new Schema<JobSchema>(),
indices: ["name", "status.type"],
});
Deno.test("create and delete", async () => {
await pointStore.deleteAll();
const point1 = await pointStore.create({ x: 1, y: 2 });
const point2 = await pointStore.create({ x: 3, y: 4 });
assert((await pointStore.getAll()).length === 2);
const point3 = await pointStore.create({ x: 5, y: 6 });
assert((await pointStore.getAll()).length === 3);
assert((await pointStore.get(point2.id))?.value.y === 4);
await point1.delete();
assert((await pointStore.getAll()).length === 2);
await point2.delete();
await point3.delete();
assert((await pointStore.getAll()).length === 0);
});
Deno.test("list by index", async () => {
await jobStore.deleteAll();
const test = await jobStore.create({
name: "test",
params: { a: 1, b: null },
status: { type: "idle" },
lastUpdateDate: new Date(),
});
assert((await jobStore.getBy("name", "test"))[0].value.params.a === 1);
assert((await jobStore.getBy("status.type", "idle"))[0].value.params.a === 1);
await test.update({ status: { type: "processing", progress: 33 } });
assert((await jobStore.getBy("status.type", "processing"))[0].value.params.a === 1);
await test.update({ status: { type: "done" } });
assert((await jobStore.getBy("status.type", "done"))[0].value.params.a === 1);
assert((await jobStore.getBy("status.type", "processing")).length === 0);
await test.delete();
assert((await jobStore.getBy("status.type", "done")).length === 0);
assert((await jobStore.getBy("name", "test")).length === 0);
});
Deno.test("fail on concurrent update", async () => {
await jobStore.deleteAll();
const test = await jobStore.create({
name: "test",
params: { a: 1, b: null },
status: { type: "idle" },
lastUpdateDate: new Date(),
});
const result = await Promise.all([
test.update({ status: { type: "processing", progress: 33 } }),
test.update({ status: { type: "done" } }),
]).catch(() => true);
assert(result === true);
await test.delete();
});

251
store.ts
View File

@ -1,76 +1,241 @@
import { ulid } from "./deps.ts";
import { log, ulid } from "./deps.ts";
export class Store<T extends object> {
constructor(
private readonly db: Deno.Kv,
private readonly storeKey: Deno.KvKeyPart,
) {
const logger = () => log.getLogger("kvStore");
export type validIndexKey<T> = {
[K in keyof T]: K extends string ? (T[K] extends Deno.KvKeyPart ? K
: T[K] extends readonly unknown[] ? never
: T[K] extends object ? `${K}.${validIndexKey<T[K]>}`
: never)
: never;
}[keyof T];
export type indexValue<T, I extends validIndexKey<T>> = I extends `${infer K}.${infer Rest}`
? K extends keyof T ? Rest extends validIndexKey<T[K]> ? indexValue<T[K], Rest>
: never
: never
: I extends keyof T ? T[I]
: never;
export class Schema<T> {}
interface StoreOptions<T, I> {
readonly schema: Schema<T>;
readonly indices: readonly I[];
}
export class Store<T, I extends validIndexKey<T>> {
readonly #db: Deno.Kv;
readonly #key: Deno.KvKeyPart;
readonly #indices: readonly I[];
constructor(db: Deno.Kv, key: Deno.KvKeyPart, options: StoreOptions<T, I>) {
this.#db = db;
this.#key = key;
this.#indices = options.indices;
}
async create(value: T): Promise<Model<T>> {
const id = ulid();
await this.db.set([this.storeKey, id], value);
return new Model(this.db, this.storeKey, id, value);
await this.#db.set([this.#key, "id", id], value);
logger().debug(["created", this.#key, "id", id].join(" "));
for (const index of this.#indices) {
const indexValue: Deno.KvKeyPart = index
.split(".")
.reduce((value, key) => value[key], value as any);
await this.#db.set([this.#key, index, indexValue, id], value);
logger().debug(["created", this.#key, index, indexValue, id].join(" "));
}
return new Model(this.#db, this.#key, this.#indices, id, value);
}
async get(id: Deno.KvKeyPart): Promise<Model<T> | null> {
const entry = await this.db.get<T>([this.storeKey, id]);
const entry = await this.#db.get<T>([this.#key, "id", id]);
if (entry.versionstamp == null) return null;
return new Model(this.db, this.storeKey, id, entry.value);
return new Model(this.#db, this.#key, this.#indices, id, entry.value);
}
async list(): Promise<Array<Model<T>>> {
const models: Array<Model<T>> = [];
for await (const entry of this.db.list<T>({ prefix: [this.storeKey] })) {
models.push(new Model(this.db, this.storeKey, entry.key[1], entry.value));
async getBy<J extends I>(
index: J,
value: indexValue<T, J>,
options?: Deno.KvListOptions,
): Promise<Array<Model<T>>> {
const models: Model<T>[] = [];
for await (
const entry of this.#db.list<T>(
{ prefix: [this.#key, index, value as Deno.KvKeyPart] },
options,
)
) {
models.push(new Model(this.#db, this.#key, this.#indices, entry.key[3], entry.value));
}
return models;
}
async getAll(
opts?: { limit?: number; reverse?: boolean },
): Promise<Array<Model<T>>> {
const { limit, reverse } = opts ?? {};
const models: Array<Model<T>> = [];
for await (
const entry of this.#db.list<T>({
prefix: [this.#key, "id"],
}, { limit, reverse })
) {
models.push(new Model(this.#db, this.#key, this.#indices, entry.key[2], entry.value));
}
return models;
}
async deleteAll(): Promise<void> {
for await (const entry of this.#db.list({ prefix: [this.#key] })) {
await this.#db.delete(entry.key);
logger().debug(["deleted", ...entry.key].join(" "));
}
}
}
export class Model<T extends object> {
#value: T;
export class Model<T> {
readonly #db: Deno.Kv;
readonly #key: Deno.KvKeyPart;
readonly #indices: readonly string[];
readonly #id: Deno.KvKeyPart;
value: T;
constructor(
private readonly db: Deno.Kv,
private readonly storeKey: Deno.KvKeyPart,
private readonly entryKey: Deno.KvKeyPart,
db: Deno.Kv,
key: Deno.KvKeyPart,
indices: readonly string[],
id: Deno.KvKeyPart,
value: T,
) {
this.#value = value;
this.#db = db;
this.#key = key;
this.#indices = indices;
this.#id = id;
this.value = value;
}
get value(): T {
return this.#value;
get id(): Deno.KvKeyPart {
return this.#id;
}
async get(): Promise<T | null> {
const entry = await this.db.get<T>([this.storeKey, this.entryKey]);
if (entry.versionstamp == null) return null;
this.#value = entry.value;
return entry.value;
async update(updater: Partial<T> | ((value: T) => T)): Promise<T | null> {
// get current main entry
const oldEntry = await this.#db.get<T>([this.#key, "id", this.#id]);
// get all current index entries
const oldIndexEntries: Record<string, Deno.KvEntryMaybe<T>> = {};
for (const index of this.#indices) {
const indexKey: Deno.KvKeyPart = index
.split(".")
.reduce((value, key) => value[key], oldEntry.value as any);
oldIndexEntries[index] = await this.#db.get<T>([this.#key, index, indexKey, this.#id]);
}
async set(value: T): Promise<T> {
await this.db.set([this.storeKey, this.entryKey], value);
this.#value = value;
return value;
}
async update(value: Partial<T> | ((value: T) => T)): Promise<T | null> {
const entry = await this.db.get<T>([this.storeKey, this.entryKey]);
if (entry.versionstamp == null) return null;
if (typeof value === "function") {
entry.value = value(entry.value);
// compute new value
if (typeof updater === "function") {
this.value = updater(this.value);
} else {
entry.value = { ...entry.value, ...value };
this.value = { ...this.value, ...updater };
}
await this.db.set([this.storeKey, this.entryKey], entry.value);
this.#value = entry.value;
return entry.value;
// begin transaction
const transaction = this.#db.atomic();
// set the main entry
transaction
.check(oldEntry)
.set([this.#key, "id", this.#id], this.value);
logger().debug(["updated", this.#key, "id", this.#id].join(" "));
// delete and create all changed index entries
for (const index of this.#indices) {
const oldIndexKey: Deno.KvKeyPart = index
.split(".")
.reduce((value, key) => value[key], oldIndexEntries[index].value as any);
const newIndexKey: Deno.KvKeyPart = index
.split(".")
.reduce((value, key) => value[key], this.value as any);
if (newIndexKey !== oldIndexKey) {
transaction
.check(oldIndexEntries[index])
.delete([this.#key, index, oldIndexKey, this.#id])
.set([this.#key, index, newIndexKey, this.#id], this.value);
logger().debug(["deleted", this.#key, index, oldIndexKey, this.#id].join(" "));
logger().debug(["created", this.#key, index, newIndexKey, this.#id].join(" "));
}
}
// commit
const result = await transaction.commit();
if (!result.ok) throw new Error(`Failed to update ${this.#key} ${this.#id}`);
return this.value;
}
async delete(): Promise<void> {
await this.db.delete([this.storeKey, this.entryKey]);
// get current main entry
const entry = await this.#db.get<T>([this.#key, "id", this.#id]);
// begin transaction
const transaction = this.#db.atomic();
// delete main entry
transaction
.check(entry)
.delete([this.#key, "id", this.#id]);
logger().debug(["deleted", this.#key, "id", this.#id].join(" "));
// delete all index entries
for (const index of this.#indices) {
const indexKey: Deno.KvKeyPart = index
.split(".")
.reduce((value, key) => value[key], entry.value as any);
transaction
.delete([this.#key, index, indexKey, this.#id]);
logger().debug(["deleted", this.#key, index, indexKey, this.#id].join(" "));
}
// commit
const result = await transaction.commit();
if (!result.ok) throw new Error(`Failed to delete ${this.#key} ${this.#id}`);
}
}
export async function retry<T>(
fn: () => Promise<T>,
options: { maxAttempts?: number; delayMs?: number } = {},
): Promise<T> {
const { maxAttempts = 3, delayMs = 1000 } = options;
let error: unknown;
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
return await fn();
} catch (err) {
error = err;
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
throw error;
}
export async function collectIterator<T>(
iterator: AsyncIterableIterator<T>,
options: { maxItems?: number; timeoutMs?: number } = {},
): Promise<T[]> {
const { maxItems = 1000, timeoutMs = 2000 } = options;
const result: T[] = [];
const timeout = setTimeout(() => iterator.return?.(), timeoutMs);
try {
for await (const item of iterator) {
result.push(item);
if (result.length >= maxItems) {
iterator.return?.();
break;
}
}
} finally {
clearTimeout(timeout);
}
return result;
}