diff --git a/README.md b/README.md index e5253cb..a650efa 100644 --- a/README.md +++ b/README.md @@ -13,10 +13,7 @@ You can put these in `.env` file or pass them as environment variables. - `TG_BOT_TOKEN` - Telegram bot token. Get yours from [@BotFather](https://t.me/BotFather). Required. -- `SD_API_URL` - URL to Stable Diffusion API. Only used on first run. Default: - `http://127.0.0.1:7860/` - `TG_ADMIN_USERNAMES` - Comma separated list of usernames of users that can use admin commands. - Only used on first run. Optional. ## Running diff --git a/api/paramsRoute.ts b/api/paramsRoute.ts index 69d2561..77e3284 100644 --- a/api/paramsRoute.ts +++ b/api/paramsRoute.ts @@ -30,18 +30,15 @@ export const paramsRoute = createMethodFilter({ return { status: 401, body: { type: "text/plain", data: "Must be logged in" } }; } const chat = await bot.api.getChat(session.userId); - if (chat.type !== "private") { - throw new Error("Chat is not private"); - } - const userName = chat.username; - if (!userName) { + if (chat.type !== "private") throw new Error("Chat is not private"); + if (!chat.username) { return { status: 403, body: { type: "text/plain", data: "Must have a username" } }; } const config = await getConfig(); - if (!config?.adminUsernames?.includes(userName)) { + if (!config?.adminUsernames?.includes(chat.username)) { return { status: 403, body: { type: "text/plain", data: "Must be an admin" } }; } - logger().info(`User ${userName} updated default params: ${JSON.stringify(body.data)}`); + logger().info(`User ${chat.username} updated default params: ${JSON.stringify(body.data)}`); const defaultParams = deepMerge(config.defaultParams ?? {}, body.data); await setConfig({ defaultParams }); return { status: 200, body: { type: "application/json", data: config.defaultParams } }; diff --git a/api/statsRoute.ts b/api/statsRoute.ts index c2bfbf9..83c622e 100644 --- a/api/statsRoute.ts +++ b/api/statsRoute.ts @@ -1,25 +1,54 @@ -// deno-lint-ignore-file require-await import { createEndpoint, createMethodFilter, createPathFilter } from "t_rest/server"; -import { liveGlobalStats } from "../app/globalStatsStore.ts"; +import { globalStats } from "../app/globalStats.ts"; import { getDailyStats } from "../app/dailyStatsStore.ts"; import { getUserStats } from "../app/userStatsStore.ts"; import { getUserDailyStats } from "../app/userDailyStatsStore.ts"; +import { generationStore } from "../app/generationStore.ts"; +import { subMinutes } from "date-fns"; + +const STATS_INTERVAL_MIN = 3; export const statsRoute = createPathFilter({ "": createMethodFilter({ GET: createEndpoint( { query: null, body: null }, async () => { - const stats = liveGlobalStats; + const after = subMinutes(new Date(), STATS_INTERVAL_MIN); + const generations = await generationStore.getAll({ after }); + + const imagesPerMinute = generations.length / STATS_INTERVAL_MIN; + + const stepsPerMinute = generations + .map((generation) => generation.value.info?.steps ?? 0) + .reduce((sum, steps) => sum + steps, 0) / STATS_INTERVAL_MIN; + + const pixelsPerMinute = generations + .map((generation) => + (generation.value.info?.width ?? 0) * (generation.value.info?.height ?? 0) + ) + .reduce((sum, pixels) => sum + pixels, 0) / STATS_INTERVAL_MIN; + + const pixelStepsPerMinute = generations + .map((generation) => + (generation.value.info?.width ?? 0) * (generation.value.info?.height ?? 0) * + (generation.value.info?.steps ?? 0) + ) + .reduce((sum, pixelSteps) => sum + pixelSteps, 0) / STATS_INTERVAL_MIN; + return { status: 200, body: { type: "application/json", data: { - imageCount: stats.imageCount, - pixelCount: stats.pixelCount, - userCount: stats.userIds.length, - timestamp: stats.timestamp, + imageCount: globalStats.imageCount, + stepCount: globalStats.stepCount, + pixelCount: globalStats.pixelCount, + pixelStepCount: globalStats.pixelStepCount, + userCount: globalStats.userIds.length, + imagesPerMinute, + stepsPerMinute, + pixelsPerMinute, + pixelStepsPerMinute, }, }, }; diff --git a/api/usersRoute.ts b/api/usersRoute.ts index a17db99..b3d0041 100644 --- a/api/usersRoute.ts +++ b/api/usersRoute.ts @@ -3,26 +3,6 @@ import { getConfig } from "../app/config.ts"; import { bot } from "../bot/mod.ts"; export const usersRoute = createPathFilter({ - "{userId}": createMethodFilter({ - GET: createEndpoint( - { query: null, body: null }, - async ({ params }) => { - const chat = await bot.api.getChat(params.userId); - if (chat.type !== "private") { - throw new Error("Chat is not private"); - } - const config = await getConfig(); - const isAdmin = chat.username && config?.adminUsernames?.includes(chat.username); - return { - status: 200, - body: { - type: "application/json", - data: { ...chat, isAdmin }, - }, - }; - }, - ), - }), "{userId}/photo": createMethodFilter({ GET: createEndpoint( { query: null, body: null }, @@ -51,4 +31,25 @@ export const usersRoute = createPathFilter({ }, ), }), + + "{userId}": createMethodFilter({ + GET: createEndpoint( + { query: null, body: null }, + async ({ params }) => { + const chat = await bot.api.getChat(params.userId); + if (chat.type !== "private") { + throw new Error("Chat is not private"); + } + const config = await getConfig(); + const isAdmin = chat.username && config?.adminUsernames?.includes(chat.username); + return { + status: 200, + body: { + type: "application/json", + data: { ...chat, isAdmin }, + }, + }; + }, + ), + }), }); diff --git a/api/workersRoute.ts b/api/workersRoute.ts index a1227cc..f6e4106 100644 --- a/api/workersRoute.ts +++ b/api/workersRoute.ts @@ -3,96 +3,314 @@ import { activeGenerationWorkers } from "../app/generationQueue.ts"; import { getConfig } from "../app/config.ts"; import * as SdApi from "../app/sdApi.ts"; import createOpenApiFetch from "openapi_fetch"; +import { sessions } from "./sessionsRoute.ts"; +import { bot } from "../bot/mod.ts"; +import { getLogger } from "std/log/mod.ts"; +import { WorkerInstance, workerInstanceStore } from "../app/workerInstanceStore.ts"; +import { getAuthHeader } from "../utils/getAuthHeader.ts"; +import { Model } from "indexed_kv"; +import { generationStore } from "../app/generationStore.ts"; +import { subMinutes } from "date-fns"; + +const logger = () => getLogger(); + +export type WorkerData = Omit & { + id: string; + isActive: boolean; + imagesPerMinute: number; + stepsPerMinute: number; + pixelsPerMinute: number; + pixelStepsPerMinute: number; +}; + +const STATS_INTERVAL_MIN = 10; + +async function getWorkerData(workerInstance: Model): Promise { + const after = subMinutes(new Date(), STATS_INTERVAL_MIN); + + const generations = await generationStore.getBy("workerInstanceKey", { + value: workerInstance.value.key, + after: after, + }); + + const imagesPerMinute = generations.length / STATS_INTERVAL_MIN; + + const stepsPerMinute = generations + .map((generation) => generation.value.info?.steps ?? 0) + .reduce((sum, steps) => sum + steps, 0) / STATS_INTERVAL_MIN; + + const pixelsPerMinute = generations + .map((generation) => (generation.value.info?.width ?? 0) * (generation.value.info?.height ?? 0)) + .reduce((sum, pixels) => sum + pixels, 0) / STATS_INTERVAL_MIN; + + const pixelStepsPerMinute = generations + .map((generation) => + (generation.value.info?.width ?? 0) * (generation.value.info?.height ?? 0) * + (generation.value.info?.steps ?? 0) + ) + .reduce((sum, pixelSteps) => sum + pixelSteps, 0) / STATS_INTERVAL_MIN; + + return { + id: workerInstance.id, + key: workerInstance.value.key, + name: workerInstance.value.name, + lastError: workerInstance.value.lastError, + lastOnlineTime: workerInstance.value.lastOnlineTime, + isActive: activeGenerationWorkers.get(workerInstance.id)?.isProcessing ?? false, + imagesPerMinute, + stepsPerMinute, + pixelsPerMinute, + pixelStepsPerMinute, + }; +} export const workersRoute = createPathFilter({ "": createMethodFilter({ - "GET": createEndpoint( + GET: createEndpoint( { query: null, body: null }, async () => { - const activeWorkers = activeGenerationWorkers; - const { sdInstances } = await getConfig(); - - const workers = Object.entries(sdInstances).map(([sdInstanceId, sdInstance]) => ({ - id: sdInstanceId, - name: sdInstance.name ?? sdInstanceId, - maxResolution: sdInstance.maxResolution, - active: activeWorkers.has(sdInstanceId), - lastOnline: null, - imagesPerMinute: null, - pixelsPerSecond: null, - pixelStepsPerSecond: null, - })); + const workerInstances = await workerInstanceStore.getAll(); + const workers = await Promise.all(workerInstances.map(getWorkerData)); return { status: 200, - body: { type: "application/json", data: workers }, + body: { type: "application/json", data: workers satisfies WorkerData[] }, + }; + }, + ), + POST: createEndpoint( + { + query: { + sessionId: { type: "string" }, + }, + body: { + type: "application/json", + schema: { + type: "object", + properties: { + key: { type: "string" }, + name: { type: ["string", "null"] }, + sdUrl: { type: "string" }, + sdAuth: { + type: ["object", "null"], + properties: { + user: { type: "string" }, + password: { type: "string" }, + }, + required: ["user", "password"], + }, + }, + required: ["key", "name", "sdUrl", "sdAuth"], + }, + }, + }, + async ({ query, body }) => { + const session = sessions.get(query.sessionId); + if (!session?.userId) { + return { status: 401, body: { type: "text/plain", data: "Must be logged in" } }; + } + const chat = await bot.api.getChat(session.userId); + if (chat.type !== "private") throw new Error("Chat is not private"); + if (!chat.username) { + return { status: 403, body: { type: "text/plain", data: "Must have a username" } }; + } + const config = await getConfig(); + if (!config?.adminUsernames?.includes(chat.username)) { + return { status: 403, body: { type: "text/plain", data: "Must be an admin" } }; + } + const workerInstance = await workerInstanceStore.create({ + key: body.data.key, + name: body.data.name, + sdUrl: body.data.sdUrl, + sdAuth: body.data.sdAuth, + }); + logger().info(`User ${chat.username} created worker ${workerInstance.id}`); + const worker = await getWorkerData(workerInstance); + return { + status: 200, + body: { type: "application/json", data: worker satisfies WorkerData }, }; }, ), }), - "{workerId}/loras": createMethodFilter({ - GET: createEndpoint( - { query: null, body: null }, - async ({ params }) => { - const { sdInstances } = await getConfig(); - const sdInstance = sdInstances[params.workerId]; - if (!sdInstance) { - return { status: 404, body: { type: "text/plain", data: `Worker not found` } }; - } - const sdClient = createOpenApiFetch({ baseUrl: sdInstance.api.url }); - const lorasResponse = await sdClient.GET("/sdapi/v1/loras", { - headers: sdInstance.api.auth ? { Authorization: sdInstance.api.auth } : undefined, - }); - if (lorasResponse.error) { + "{workerId}": createPathFilter({ + "": createMethodFilter({ + GET: createEndpoint( + { query: null, body: null }, + async ({ params }) => { + const workerInstance = await workerInstanceStore.getById(params.workerId); + if (!workerInstance) { + return { status: 404, body: { type: "text/plain", data: `Worker not found` } }; + } + const worker: WorkerData = await getWorkerData(workerInstance); return { - status: 500, - body: { type: "text/plain", data: `Loras request failed: ${lorasResponse["error"]}` }, + status: 200, + body: { type: "application/json", data: worker satisfies WorkerData }, }; - } - const loras = (lorasResponse.data as Lora[]).map((lora) => ({ - name: lora.name, - alias: lora.alias ?? null, - })); - return { - status: 200, - body: { type: "application/json", data: loras }, - }; - }, - ), - }), + }, + ), + PATCH: createEndpoint( + { + query: { + sessionId: { type: "string" }, + }, + body: { + type: "application/json", + schema: { + type: "object", + properties: { + key: { type: "string" }, + name: { type: ["string", "null"] }, + sdUrl: { type: "string" }, + auth: { + type: ["object", "null"], + properties: { + user: { type: "string" }, + password: { type: "string" }, + }, + required: ["user", "password"], + }, + }, + }, + }, + }, + async ({ params, query, body }) => { + const workerInstance = await workerInstanceStore.getById(params.workerId); + if (!workerInstance) { + return { status: 404, body: { type: "text/plain", data: `Worker not found` } }; + } + const session = sessions.get(query.sessionId); + if (!session?.userId) { + return { status: 401, body: { type: "text/plain", data: "Must be logged in" } }; + } + const chat = await bot.api.getChat(session.userId); + if (chat.type !== "private") throw new Error("Chat is not private"); + if (!chat.username) { + return { status: 403, body: { type: "text/plain", data: "Must have a username" } }; + } + const config = await getConfig(); + if (!config?.adminUsernames?.includes(chat.username)) { + return { status: 403, body: { type: "text/plain", data: "Must be an admin" } }; + } + if (body.data.name !== undefined) { + workerInstance.value.name = body.data.name; + } + if (body.data.sdUrl !== undefined) { + workerInstance.value.sdUrl = body.data.sdUrl; + } + if (body.data.auth !== undefined) { + workerInstance.value.sdAuth = body.data.auth; + } + logger().info( + `User ${chat.username} updated worker ${params.workerId}: ${JSON.stringify(body.data)}`, + ); + await workerInstance.update(); + const worker = await getWorkerData(workerInstance); + return { + status: 200, + body: { type: "application/json", data: worker satisfies WorkerData }, + }; + }, + ), + DELETE: createEndpoint( + { + query: { + sessionId: { type: "string" }, + }, + body: null, + }, + async ({ params, query }) => { + const workerInstance = await workerInstanceStore.getById(params.workerId); + if (!workerInstance) { + return { status: 404, body: { type: "text/plain", data: `Worker not found` } }; + } + const session = sessions.get(query.sessionId); + if (!session?.userId) { + return { status: 401, body: { type: "text/plain", data: "Must be logged in" } }; + } + const chat = await bot.api.getChat(session.userId); + if (chat.type !== "private") throw new Error("Chat is not private"); + if (!chat.username) { + return { status: 403, body: { type: "text/plain", data: "Must have a username" } }; + } + const config = await getConfig(); + if (!config?.adminUsernames?.includes(chat.username)) { + return { status: 403, body: { type: "text/plain", data: "Must be an admin" } }; + } + logger().info(`User ${chat.username} deleted worker ${params.workerId}`); + await workerInstance.delete(); + return { status: 200, body: { type: "application/json", data: null } }; + }, + ), + }), - "{workerId}/models": createMethodFilter({ - GET: createEndpoint( - { query: null, body: null }, - async ({ params }) => { - const { sdInstances } = await getConfig(); - const sdInstance = sdInstances[params.workerId]; - if (!sdInstance) { - return { status: 404, body: { type: "text/plain", data: `Worker not found` } }; - } - const sdClient = createOpenApiFetch({ baseUrl: sdInstance.api.url }); - const modelsResponse = await sdClient.GET("/sdapi/v1/sd-models", { - headers: sdInstance.api.auth ? { Authorization: sdInstance.api.auth } : undefined, - }); - if (modelsResponse.error) { + "loras": createMethodFilter({ + GET: createEndpoint( + { query: null, body: null }, + async ({ params }) => { + const workerInstance = await workerInstanceStore.getById(params.workerId); + if (!workerInstance) { + return { status: 404, body: { type: "text/plain", data: `Worker not found` } }; + } + const sdClient = createOpenApiFetch({ + baseUrl: workerInstance.value.sdUrl, + headers: getAuthHeader(workerInstance.value.sdAuth), + }); + const lorasResponse = await sdClient.GET("/sdapi/v1/loras", {}); + if (lorasResponse.error) { + return { + status: 500, + body: { type: "text/plain", data: `Loras request failed: ${lorasResponse["error"]}` }, + }; + } + const loras = (lorasResponse.data as Lora[]).map((lora) => ({ + name: lora.name, + alias: lora.alias ?? null, + })); return { - status: 500, - body: { type: "text/plain", data: `Models request failed: ${modelsResponse["error"]}` }, + status: 200, + body: { type: "application/json", data: loras }, }; - } - const models = modelsResponse.data.map((model) => ({ - title: model.title, - modelName: model.model_name, - hash: model.hash, - sha256: model.sha256, - })); - return { - status: 200, - body: { type: "application/json", data: models }, - }; - }, - ), + }, + ), + }), + + "models": createMethodFilter({ + GET: createEndpoint( + { query: null, body: null }, + async ({ params }) => { + const workerInstance = await workerInstanceStore.getById(params.workerId); + if (!workerInstance) { + return { status: 404, body: { type: "text/plain", data: `Worker not found` } }; + } + const sdClient = createOpenApiFetch({ + baseUrl: workerInstance.value.sdUrl, + headers: getAuthHeader(workerInstance.value.sdAuth), + }); + const modelsResponse = await sdClient.GET("/sdapi/v1/sd-models", {}); + if (modelsResponse.error) { + return { + status: 500, + body: { + type: "text/plain", + data: `Models request failed: ${modelsResponse["error"]}`, + }, + }; + } + const models = modelsResponse.data.map((model) => ({ + title: model.title, + modelName: model.model_name, + hash: model.hash, + sha256: model.sha256, + })); + return { + status: 200, + body: { type: "application/json", data: models }, + }; + }, + ), + }), }), }); diff --git a/app/config.ts b/app/config.ts index fdf9a15..0b67c7d 100644 --- a/app/config.ts +++ b/app/config.ts @@ -21,27 +21,8 @@ export const configSchema = { negative_prompt: { type: "string" }, }, }, - sdInstances: { - type: "object", - additionalProperties: { - type: "object", - properties: { - name: { type: "string" }, - api: { - type: "object", - properties: { - url: { type: "string" }, - auth: { type: "string" }, - }, - required: ["url"], - }, - maxResolution: { type: "number" }, - }, - required: ["api", "maxResolution"], - }, - }, }, - required: ["adminUsernames", "maxUserJobs", "maxJobs", "defaultParams", "sdInstances"], + required: ["adminUsernames", "maxUserJobs", "maxJobs", "defaultParams"], } as const satisfies JsonSchema; export type Config = jsonType; @@ -55,13 +36,6 @@ export async function getConfig(): Promise { maxUserJobs: config?.maxUserJobs ?? Infinity, maxJobs: config?.maxJobs ?? Infinity, defaultParams: config?.defaultParams ?? {}, - sdInstances: config?.sdInstances ?? - { - "local": { - api: { url: Deno.env.get("SD_API_URL") ?? "http://127.0.0.1:7860/" }, - maxResolution: 1024 * 1024, - }, - }, }; } @@ -73,7 +47,6 @@ export async function setConfig(newConfig: Partial): Promise { maxUserJobs: newConfig.maxUserJobs ?? oldConfig.maxUserJobs, maxJobs: newConfig.maxJobs ?? oldConfig.maxJobs, defaultParams: newConfig.defaultParams ?? oldConfig.defaultParams, - sdInstances: newConfig.sdInstances ?? oldConfig.sdInstances, }; await db.set(["config"], config); } diff --git a/app/dailyStatsStore.ts b/app/dailyStatsStore.ts index 8dc4353..dca9313 100644 --- a/app/dailyStatsStore.ts +++ b/app/dailyStatsStore.ts @@ -13,10 +13,12 @@ export const dailyStatsSchema = { properties: { userIds: { type: "array", items: { type: "number" } }, imageCount: { type: "number" }, + stepCount: { type: "number" }, pixelCount: { type: "number" }, + pixelStepCount: { type: "number" }, timestamp: { type: "number" }, }, - required: ["userIds", "imageCount", "pixelCount", "timestamp"], + required: ["userIds", "imageCount", "stepCount", "pixelCount", "pixelStepCount", "timestamp"], } as const satisfies JsonSchema; export type DailyStats = jsonType; @@ -27,7 +29,9 @@ export const getDailyStats = kvMemoize( async (year: number, month: number, day: number): Promise => { const userIdSet = new Set(); let imageCount = 0; + let stepCount = 0; let pixelCount = 0; + let pixelStepCount = 0; const after = new Date(Date.UTC(year, month - 1, day)); const before = new Date(Date.UTC(year, month - 1, day + 1)); @@ -39,13 +43,19 @@ export const getDailyStats = kvMemoize( ) { userIdSet.add(generation.value.from.id); imageCount++; + stepCount += generation.value.info?.steps ?? 0; pixelCount += (generation.value.info?.width ?? 0) * (generation.value.info?.height ?? 0); + pixelStepCount += (generation.value.info?.width ?? 0) * + (generation.value.info?.height ?? 0) * + (generation.value.info?.steps ?? 0); } return { userIds: [...userIdSet], imageCount, + stepCount, pixelCount, + pixelStepCount, timestamp: Date.now(), }; }, diff --git a/app/generationQueue.ts b/app/generationQueue.ts index 32ea287..bed9ea9 100644 --- a/app/generationQueue.ts +++ b/app/generationQueue.ts @@ -10,12 +10,14 @@ import { bot } from "../bot/mod.ts"; import { PngInfo } from "../bot/parsePngInfo.ts"; import { formatOrdinal } from "../utils/formatOrdinal.ts"; import { formatUserChat } from "../utils/formatUserChat.ts"; +import { getAuthHeader } from "../utils/getAuthHeader.ts"; import { SdError } from "./SdError.ts"; import { getConfig } from "./config.ts"; import { db, fs } from "./db.ts"; import { SdGenerationInfo } from "./generationStore.ts"; import * as SdApi from "./sdApi.ts"; import { uploadQueue } from "./uploadQueue.ts"; +import { workerInstanceStore } from "./workerInstanceStore.ts"; const logger = () => getLogger(); @@ -34,7 +36,7 @@ interface GenerationJob { chat: Chat; requestMessage: Message; replyMessage: Message; - sdInstanceId?: string; + workerInstanceKey?: string; progress?: number; } @@ -47,18 +49,17 @@ export const activeGenerationWorkers = new Map>(); */ export async function processGenerationQueue() { while (true) { - const config = await getConfig(); - - for (const [sdInstanceId, sdInstance] of Object.entries(config?.sdInstances ?? {})) { - const activeWorker = activeGenerationWorkers.get(sdInstanceId); + for await (const workerInstance of workerInstanceStore.listAll()) { + const activeWorker = activeGenerationWorkers.get(workerInstance.id); if (activeWorker?.isProcessing) { continue; } const workerSdClient = createOpenApiClient({ - baseUrl: sdInstance.api.url, - headers: { "Authorization": sdInstance.api.auth }, + baseUrl: workerInstance.value.sdUrl, + headers: getAuthHeader(workerInstance.value.sdAuth), }); + // check if worker is up const activeWorkerStatus = await workerSdClient.GET("/sdapi/v1/memory", { signal: AbortSignal.timeout(10_000), @@ -70,16 +71,20 @@ export async function processGenerationQueue() { return response; }) .catch((error) => { - logger().debug(`Worker ${sdInstanceId} is down: ${error}`); + workerInstance.update({ lastError: { message: error.message, time: Date.now() } }) + .catch(() => undefined); + logger().debug(`Worker ${workerInstance.value.key} is down: ${error}`); }); + if (!activeWorkerStatus?.data) { continue; } // create worker const newWorker = generationQueue.createWorker(async ({ state }, updateJob) => { - await processGenerationJob(state, updateJob, sdInstanceId); + await processGenerationJob(state, updateJob, workerInstance.id); }); + newWorker.addEventListener("error", (e) => { logger().error( `Generation failed for ${formatUserChat(e.detail.job.state)}: ${e.detail.error}`, @@ -96,11 +101,19 @@ export async function processGenerationQueue() { }, ).catch(() => undefined); newWorker.stopProcessing(); - logger().info(`Stopped worker ${sdInstanceId}`); + workerInstance.update({ lastError: { message: e.detail.error.message, time: Date.now() } }) + .catch(() => undefined); + logger().info(`Stopped worker ${workerInstance.value.key}`); }); + + newWorker.addEventListener("complete", () => { + workerInstance.update({ lastOnlineTime: Date.now() }).catch(() => undefined); + }); + + await workerInstance.update({ lastOnlineTime: Date.now() }); newWorker.processJobs(); - activeGenerationWorkers.set(sdInstanceId, newWorker); - logger().info(`Started worker ${sdInstanceId}`); + activeGenerationWorkers.set(workerInstance.id, newWorker); + logger().info(`Started worker ${workerInstance.value.key}`); } await delay(60_000); } @@ -112,19 +125,19 @@ export async function processGenerationQueue() { async function processGenerationJob( state: GenerationJob, updateJob: (job: Partial>) => Promise, - sdInstanceId: string, + workerInstanceId: string, ) { const startDate = new Date(); const config = await getConfig(); - const sdInstance = config?.sdInstances?.[sdInstanceId]; - if (!sdInstance) { - throw new Error(`Unknown sdInstanceId: ${sdInstanceId}`); + const workerInstance = await workerInstanceStore.getById(workerInstanceId); + if (!workerInstance) { + throw new Error(`Unknown workerInstanceId: ${workerInstanceId}`); } const workerSdClient = createOpenApiClient({ - baseUrl: sdInstance.api.url, - headers: { "Authorization": sdInstance.api.auth }, + baseUrl: workerInstance.value.sdUrl, + headers: getAuthHeader(workerInstance.value.sdAuth), }); - state.sdInstanceId = sdInstanceId; + state.workerInstanceKey = workerInstance.value.key; state.progress = 0; logger().debug(`Generation started for ${formatUserChat(state)}`); await updateJob({ state: state }); @@ -142,14 +155,16 @@ async function processGenerationJob( await bot.api.editMessageText( state.replyMessage.chat.id, state.replyMessage.message_id, - `Generating your prompt now... 0% using ${sdInstance.name || sdInstanceId}`, + `Generating your prompt now... 0% using ${ + workerInstance.value.name || workerInstance.value.key + }`, { maxAttempts: 1 }, ).catch(() => undefined); // reduce size if worker can't handle the resolution const size = limitSize( { ...config.defaultParams, ...state.task.params }, - sdInstance.maxResolution, + 1024 * 1024, ); function limitSize( { width, height }: { width?: number; height?: number }, @@ -233,7 +248,7 @@ async function processGenerationJob( state.replyMessage.message_id, `Generating your prompt now... ${ (progressResponse.data.progress * 100).toFixed(0) - }% using ${sdInstance.name || sdInstanceId}`, + }% using ${workerInstance.value.name || workerInstance.value.key}`, { maxAttempts: 1 }, ).catch(() => undefined); } @@ -268,7 +283,7 @@ async function processGenerationJob( from: state.from, requestMessage: state.requestMessage, replyMessage: state.replyMessage, - sdInstanceId: sdInstanceId, + workerInstanceKey: workerInstance.value.key, startDate, endDate: new Date(), imageKeys, diff --git a/app/generationStore.ts b/app/generationStore.ts index d5d79df..a1bd5a2 100644 --- a/app/generationStore.ts +++ b/app/generationStore.ts @@ -5,7 +5,7 @@ import { db } from "./db.ts"; export interface GenerationSchema { from: User; chat: Chat; - sdInstanceId?: string; + sdInstanceId?: string; // TODO: change to workerInstanceKey info?: SdGenerationInfo; startDate?: Date; endDate?: Date; @@ -48,6 +48,7 @@ export interface SdGenerationInfo { type GenerationIndices = { fromId: number; chatId: number; + workerInstanceKey: string; }; export const generationStore = new Store( @@ -57,6 +58,7 @@ export const generationStore = new Store( indices: { fromId: { getValue: (item) => item.from.id }, chatId: { getValue: (item) => item.chat.id }, + workerInstanceKey: { getValue: (item) => item.sdInstanceId ?? "" }, }, }, ); diff --git a/app/globalStatsStore.ts b/app/globalStats.ts similarity index 76% rename from app/globalStatsStore.ts rename to app/globalStats.ts index a71136d..e03ef59 100644 --- a/app/globalStatsStore.ts +++ b/app/globalStats.ts @@ -9,17 +9,19 @@ export const globalStatsSchema = { properties: { userIds: { type: "array", items: { type: "number" } }, imageCount: { type: "number" }, + stepCount: { type: "number" }, pixelCount: { type: "number" }, + pixelStepCount: { type: "number" }, timestamp: { type: "number" }, }, - required: ["userIds", "imageCount", "pixelCount", "timestamp"], + required: ["userIds", "imageCount", "stepCount", "pixelCount", "pixelStepCount", "timestamp"], } as const satisfies JsonSchema; export type GlobalStats = jsonType; -export const liveGlobalStats: GlobalStats = await getGlobalStats(); +export const globalStats: GlobalStats = await getGlobalStats(); -export async function getGlobalStats(): Promise { +async function getGlobalStats(): Promise { // find the year/month/day of the first generation const startDate = await generationStore.getAll({}, { limit: 1 }) .then((generations) => generations[0]?.id) @@ -28,7 +30,9 @@ export async function getGlobalStats(): Promise { // iterate to today and sum up stats const userIdSet = new Set(); let imageCount = 0; + let stepCount = 0; let pixelCount = 0; + let pixelStepCount = 0; const tomorrow = addDays(new Date(), 1); @@ -44,13 +48,17 @@ export async function getGlobalStats(): Promise { ); for (const userId of dailyStats.userIds) userIdSet.add(userId); imageCount += dailyStats.imageCount; + stepCount += dailyStats.stepCount; pixelCount += dailyStats.pixelCount; + pixelStepCount += dailyStats.pixelStepCount; } return { userIds: [...userIdSet], imageCount, + stepCount, pixelCount, + pixelStepCount, timestamp: Date.now(), }; } diff --git a/app/uploadQueue.ts b/app/uploadQueue.ts index 5fd747e..1cd9647 100644 --- a/app/uploadQueue.ts +++ b/app/uploadQueue.ts @@ -9,7 +9,7 @@ import { bot } from "../bot/mod.ts"; import { formatUserChat } from "../utils/formatUserChat.ts"; import { db, fs } from "./db.ts"; import { generationStore, SdGenerationInfo } from "./generationStore.ts"; -import { liveGlobalStats } from "./globalStatsStore.ts"; +import { globalStats } from "./globalStats.ts"; const logger = () => getLogger(); @@ -18,7 +18,7 @@ interface UploadJob { chat: Chat; requestMessage: Message; replyMessage: Message; - sdInstanceId: string; + workerInstanceKey?: string; startDate: Date; endDate: Date; imageKeys: Deno.KvKey[]; @@ -56,7 +56,7 @@ export async function processUploadQueue() { fmt`${bold("CFG scale:")} ${state.info.cfg_scale}, `, fmt`${bold("Seed:")} ${state.info.seed}, `, fmt`${bold("Size")}: ${state.info.width}x${state.info.height}, `, - fmt`${bold("Worker")}: ${state.sdInstanceId}, `, + state.workerInstanceKey ? fmt`${bold("Worker")}: ${state.workerInstanceKey}, ` : "", fmt`${bold("Time taken")}: ${format(jobDurationMs, { ignoreZero: true })}`, ] : [], @@ -104,7 +104,7 @@ export async function processUploadQueue() { await generationStore.create({ from: state.from, chat: state.chat, - sdInstanceId: state.sdInstanceId, + sdInstanceId: state.workerInstanceKey, startDate: state.startDate, endDate: new Date(), info: state.info, @@ -112,11 +112,13 @@ export async function processUploadQueue() { // update live stats { - liveGlobalStats.imageCount++; - liveGlobalStats.pixelCount += state.info.width * state.info.height; - const userIdSet = new Set(liveGlobalStats.userIds); + globalStats.imageCount++; + globalStats.stepCount += state.info.steps; + globalStats.pixelCount += state.info.width * state.info.height; + globalStats.pixelStepCount += state.info.width * state.info.height * state.info.steps; + const userIdSet = new Set(globalStats.userIds); userIdSet.add(state.from.id); - liveGlobalStats.userIds = [...userIdSet]; + globalStats.userIds = [...userIdSet]; } // delete the status message diff --git a/app/userStatsStore.ts b/app/userStatsStore.ts index a72823c..e2f16f8 100644 --- a/app/userStatsStore.ts +++ b/app/userStatsStore.ts @@ -13,14 +13,24 @@ export const userStatsSchema = { properties: { userId: { type: "number" }, imageCount: { type: "number" }, + stepCount: { type: "number" }, pixelCount: { type: "number" }, + pixelStepCount: { type: "number" }, tagCountMap: { type: "object", additionalProperties: { type: "number" }, }, timestamp: { type: "number" }, }, - required: ["userId", "imageCount", "pixelCount", "tagCountMap", "timestamp"], + required: [ + "userId", + "imageCount", + "stepCount", + "pixelCount", + "pixelStepCount", + "tagCountMap", + "timestamp", + ], } as const satisfies JsonSchema; export type UserStats = jsonType; @@ -48,7 +58,9 @@ export const getUserStats = kvMemoize( ["userStats"], async (userId: number): Promise => { let imageCount = 0; + let stepCount = 0; let pixelCount = 0; + let pixelStepCount = 0; const tagCountMap: Record = {}; logger().info(`Calculating user stats for ${userId}`); @@ -57,7 +69,11 @@ export const getUserStats = kvMemoize( const generation of generationStore.listBy("fromId", { value: userId }) ) { imageCount++; + stepCount += generation.value.info?.steps ?? 0; pixelCount += (generation.value.info?.width ?? 0) * (generation.value.info?.height ?? 0); + pixelStepCount += (generation.value.info?.width ?? 0) * + (generation.value.info?.height ?? 0) * + (generation.value.info?.steps ?? 0); const tags = generation.value.info?.prompt // split on punctuation and newlines @@ -85,7 +101,9 @@ export const getUserStats = kvMemoize( return { userId, imageCount, + stepCount, pixelCount, + pixelStepCount, tagCountMap, timestamp: Date.now(), }; diff --git a/app/workerInstanceStore.ts b/app/workerInstanceStore.ts new file mode 100644 index 0000000..06325de --- /dev/null +++ b/app/workerInstanceStore.ts @@ -0,0 +1,38 @@ +import { Store } from "indexed_kv"; +import { JsonSchema, jsonType } from "t_rest/server"; +import { db } from "./db.ts"; + +export const workerInstanceSchema = { + type: "object", + properties: { + // used for counting stats + key: { type: "string" }, + // used for display + name: { type: ["string", "null"] }, + sdUrl: { type: "string" }, + sdAuth: { + type: ["object", "null"], + properties: { + user: { type: "string" }, + password: { type: "string" }, + }, + required: ["user", "password"], + }, + lastOnlineTime: { type: "number" }, + lastError: { + type: "object", + properties: { + message: { type: "string" }, + time: { type: "number" }, + }, + required: ["message", "time"], + }, + }, + required: ["key", "name", "sdUrl", "sdAuth"], +} as const satisfies JsonSchema; + +export type WorkerInstance = jsonType; + +export const workerInstanceStore = new Store(db, "workerInstances", { + indices: {}, +}); diff --git a/bot/queueCommand.ts b/bot/queueCommand.ts index ff0d0a8..2ecb415 100644 --- a/bot/queueCommand.ts +++ b/bot/queueCommand.ts @@ -1,9 +1,9 @@ import { CommandContext } from "grammy"; import { bold, fmt } from "grammy_parse_mode"; -import { getConfig } from "../app/config.ts"; import { activeGenerationWorkers, generationQueue } from "../app/generationQueue.ts"; import { getFlagEmoji } from "../utils/getFlagEmoji.ts"; import { ErisContext } from "./mod.ts"; +import { workerInstanceStore } from "../app/workerInstanceStore.ts"; export async function queueCommand(ctx: CommandContext) { let formattedMessage = await getMessageText(); @@ -14,8 +14,8 @@ export async function queueCommand(ctx: CommandContext) { handleFutureUpdates().catch(() => undefined); async function getMessageText() { - const config = await getConfig(); const allJobs = await generationQueue.getAllJobs(); + const workerInstances = await workerInstanceStore.getAll(); const processingJobs = allJobs .filter((job) => job.lockUntil > new Date()).map((job) => ({ ...job, index: 0 })); const waitingJobs = allJobs @@ -30,24 +30,17 @@ export async function queueCommand(ctx: CommandContext) { `${job.index}. `, fmt`${bold(job.state.from.first_name)} `, job.state.from.last_name ? fmt`${bold(job.state.from.last_name)} ` : "", - job.state.from.username ? `(@${job.state.from.username}) ` : "", getFlagEmoji(job.state.from.language_code) ?? "", - job.state.chat.type === "private" ? " in private chat " : ` in ${job.state.chat.title} `, - job.state.chat.type !== "private" && job.state.chat.type !== "group" && - job.state.chat.username - ? `(@${job.state.chat.username}) ` - : "", - job.index === 0 && job.state.progress && job.state.sdInstanceId - ? `(${(job.state.progress * 100).toFixed(0)}% using ${job.state.sdInstanceId}) ` + job.index === 0 && job.state.progress && job.state.workerInstanceKey + ? `(${(job.state.progress * 100).toFixed(0)}% using ${job.state.workerInstanceKey}) ` : "", "\n", ]) : ["Queue is empty.\n"], "\nActive workers:\n", - ...Object.entries(config.sdInstances).flatMap(([sdInstanceId, sdInstance]) => [ - activeGenerationWorkers.get(sdInstanceId)?.isProcessing ? "✅ " : "☠️ ", - fmt`${bold(sdInstance.name || sdInstanceId)} `, - `(max ${(sdInstance.maxResolution / 1000000).toFixed(1)} Mpx) `, + ...workerInstances.flatMap((workerInstace) => [ + activeGenerationWorkers.get(workerInstace.id)?.isProcessing ? "✅ " : "☠️ ", + fmt`${bold(workerInstace.value.name || workerInstace.value.key)} `, "\n", ]), ]); diff --git a/deno.json b/deno.json index 24b7ec7..92f715d 100644 --- a/deno.json +++ b/deno.json @@ -42,6 +42,7 @@ "react": "https://esm.sh/react@18.2.0?dev", "react-dom/client": "https://esm.sh/react-dom@18.2.0/client?dev", "react-router-dom": "https://esm.sh/react-router-dom@6.16.0?dev", + "react-intl": "https://esm.sh/react-intl@6.4.7?external=react&alias=@types/react:react&dev", "swr": "https://esm.sh/swr@2.2.4?dev", "swr/mutation": "https://esm.sh/swr@2.2.4/mutation?dev", "use-local-storage": "https://esm.sh/use-local-storage@3.0.0?dev", diff --git a/ui/App.tsx b/ui/App.tsx index e221fd0..a1d1466 100644 --- a/ui/App.tsx +++ b/ui/App.tsx @@ -4,8 +4,9 @@ import useLocalStorage from "use-local-storage"; import { AppHeader } from "./AppHeader.tsx"; import { QueuePage } from "./QueuePage.tsx"; import { SettingsPage } from "./SettingsPage.tsx"; +import { StatsPage } from "./StatsPage.tsx"; +import { WorkersPage } from "./WorkersPage.tsx"; import { fetchApi, handleResponse } from "./apiClient.tsx"; -import { HomePage } from "./HomePage.tsx"; export function App() { // store session ID in the local storage @@ -30,7 +31,8 @@ export function App() { />
- } /> + } /> + } /> } /> } /> diff --git a/ui/AppHeader.tsx b/ui/AppHeader.tsx index a45171b..486de7b 100644 --- a/ui/AppHeader.tsx +++ b/ui/AppHeader.tsx @@ -57,6 +57,9 @@ export function AppHeader( Stats + + Workers + Queue diff --git a/ui/Counter.tsx b/ui/Counter.tsx index a3ce098..ea6757b 100644 --- a/ui/Counter.tsx +++ b/ui/Counter.tsx @@ -2,18 +2,18 @@ import { cx } from "@twind/core"; import React from "react"; function CounterDigit(props: { value: number; transitionDurationMs?: number }) { - const { value, transitionDurationMs = 1000 } = props; + const { value, transitionDurationMs = 1500 } = props; const rads = -(Math.floor(value) % 1_000_000) * 2 * Math.PI * 0.1; return ( - + {Array.from({ length: 10 }).map((_, i) => ( @@ -24,31 +24,35 @@ function CounterDigit(props: { value: number; transitionDurationMs?: number }) { ); } +const Spacer = () => +; + +const CounterText = (props: { children: React.ReactNode }) => ( + + {props.children} + +); + export function Counter(props: { value: number; digits: number; + fractionDigits?: number; transitionDurationMs?: number; className?: string; + postfix?: string; }) { - const { value, digits, transitionDurationMs, className } = props; + const { value, digits, fractionDigits = 0, transitionDurationMs, className, postfix } = props; return ( {Array.from({ length: digits }) .flatMap((_, i) => [ - i > 0 && i % 3 === 0 - ? ( - - ) - : null, + i > 0 && i % 3 === 0 ? : null, , ]) .reverse()} + + {fractionDigits > 0 && ( + <> + + . + + {Array.from({ length: fractionDigits }) + .flatMap((_, i) => [ + i > 0 && i % 3 === 0 ? : null, + , + ])} + + )} + + {postfix && ( + <> + + {postfix} + + )} ); } diff --git a/ui/HomePage.tsx b/ui/HomePage.tsx deleted file mode 100644 index 8597fe9..0000000 --- a/ui/HomePage.tsx +++ /dev/null @@ -1,48 +0,0 @@ -import React from "react"; -import { fetchApi, handleResponse } from "./apiClient.tsx"; -import useSWR from "swr"; -import { eachDayOfInterval, endOfMonth, startOfMonth, subMonths } from "date-fns"; -import { UTCDateMini } from "@date-fns/utc"; -import { Counter } from "./Counter.tsx"; - -export function HomePage() { - const globalStats = useSWR( - ["stats", "GET", {}] as const, - (args) => fetchApi(...args).then(handleResponse), - { refreshInterval: 2_000 }, - ); - - return ( -
-

- Pixels painted - -

-
-

- Images generated - -

-

- Unique users - -

-
-
- ); -} diff --git a/ui/QueuePage.tsx b/ui/QueuePage.tsx index c2fe3fc..1cb1cf8 100644 --- a/ui/QueuePage.tsx +++ b/ui/QueuePage.tsx @@ -41,7 +41,7 @@ export function QueuePage() { }
- {job.state.sdInstanceId} + {job.state.workerInstanceKey} ))} diff --git a/ui/StatsPage.tsx b/ui/StatsPage.tsx new file mode 100644 index 0000000..838b8f4 --- /dev/null +++ b/ui/StatsPage.tsx @@ -0,0 +1,94 @@ +import React from "react"; +import { fetchApi, handleResponse } from "./apiClient.tsx"; +import useSWR from "swr"; +import { Counter } from "./Counter.tsx"; + +export function StatsPage() { + const globalStats = useSWR( + ["stats", "GET", {}] as const, + (args) => fetchApi(...args).then(handleResponse), + { refreshInterval: 2_000 }, + ); + + return ( +
+

+ Pixelsteps diffused + + +

+

+ Pixels painted + + +

+
+

+ Steps processed + + +

+

+ Images generated + + +

+
+

+ Unique users + +

+
+ ); +} diff --git a/ui/WorkersPage.tsx b/ui/WorkersPage.tsx new file mode 100644 index 0000000..3390720 --- /dev/null +++ b/ui/WorkersPage.tsx @@ -0,0 +1,330 @@ +import React, { useRef } from "react"; +import { FormattedRelativeTime } from "react-intl"; +import useSWR, { useSWRConfig } from "swr"; +import { WorkerData } from "../api/workersRoute.ts"; +import { Counter } from "./Counter.tsx"; +import { fetchApi, handleResponse } from "./apiClient.tsx"; + +export function WorkersPage(props: { sessionId?: string }) { + const { sessionId } = props; + + const createWorkerModalRef = useRef(null); + + const session = useSWR( + sessionId ? ["sessions/{sessionId}", "GET", { params: { sessionId } }] as const : null, + (args) => fetchApi(...args).then(handleResponse), + ); + const user = useSWR( + session.data?.userId + ? ["users/{userId}", "GET", { params: { userId: String(session.data.userId) } }] as const + : null, + (args) => fetchApi(...args).then(handleResponse), + ); + + const workers = useSWR( + ["workers", "GET", {}] as const, + (args) => fetchApi(...args).then(handleResponse), + { refreshInterval: 5000 }, + ); + + return ( + <> +
    + {workers.data?.map((worker) => ( + + ))} +
+ {user.data?.isAdmin && ( + + )} + +
{ + e.preventDefault(); + const data = new FormData(e.target as HTMLFormElement); + const key = data.get("key") as string; + const name = data.get("name") as string; + const sdUrl = data.get("url") as string; + const user = data.get("user") as string; + const password = data.get("password") as string; + console.log(key, name, user, password); + workers.mutate(async () => { + const worker = await fetchApi("workers", "POST", { + query: { sessionId: sessionId! }, + body: { + type: "application/json", + data: { + key, + name: name || null, + sdUrl, + sdAuth: user && password ? { user, password } : null, + }, + }, + }).then(handleResponse); + return [...(workers.data ?? []), worker]; + }); + + createWorkerModalRef.current?.close(); + }} + > +
+ + +
+ +
+ + +
+ +
+ + +
+
+
+ + ); +} + +function WorkerListItem(props: { worker: WorkerData; sessionId?: string }) { + const { worker, sessionId } = props; + const editWorkerModalRef = useRef(null); + const deleteWorkerModalRef = useRef(null); + + const session = useSWR( + sessionId ? ["sessions/{sessionId}", "GET", { params: { sessionId } }] as const : null, + (args) => fetchApi(...args).then(handleResponse), + ); + const user = useSWR( + session.data?.userId + ? ["users/{userId}", "GET", { params: { userId: String(session.data.userId) } }] as const + : null, + (args) => fetchApi(...args).then(handleResponse), + ); + + const { mutate } = useSWRConfig(); + + return ( +
  • +

    + {worker.name ?? worker.key} +

    + {worker.isActive ?

    ✅ Active

    : ( + <> +

    + Last seen {worker.lastOnlineTime + ? ( + + ) + : "never"} +

    + {worker.lastError && ( +

    + {worker.lastError.message} ( + ) +

    + )} + + )} +

    + images per minute, + {" "} + steps per minute +

    +

    + {user.data?.isAdmin && ( + <> + + + + )} +

    + +
    { + e.preventDefault(); + const data = new FormData(e.target as HTMLFormElement); + const user = data.get("user") as string; + const password = data.get("password") as string; + console.log(user, password); + fetchApi("workers/{workerId}", "PATCH", { + params: { workerId: worker.id }, + query: { sessionId: sessionId! }, + body: { + type: "application/json", + data: { + auth: user && password ? { user, password } : null, + }, + }, + }); + editWorkerModalRef.current?.close(); + }} + > +
    + + +
    +
    + + +
    +
    +
    + +
    { + e.preventDefault(); + fetchApi("workers/{workerId}", "DELETE", { + params: { workerId: worker.id }, + query: { sessionId: sessionId! }, + }).then(handleResponse).then(() => mutate(["workers", "GET", {}])); + deleteWorkerModalRef.current?.close(); + }} + > +

    + Are you sure you want to delete this worker? +

    +
    + + +
    +
    +
    +
  • + ); +} diff --git a/ui/main.tsx b/ui/main.tsx index 68044a6..83dc01f 100644 --- a/ui/main.tsx +++ b/ui/main.tsx @@ -4,9 +4,12 @@ import React from "react"; import { App } from "./App.tsx"; import "./twind.ts"; import { BrowserRouter } from "react-router-dom"; +import { IntlProvider } from "react-intl"; createRoot(document.body).render( - + + + , ); diff --git a/utils/formatUserChat.ts b/utils/formatUserChat.ts index 41d8219..b8106a6 100644 --- a/utils/formatUserChat.ts +++ b/utils/formatUserChat.ts @@ -1,7 +1,7 @@ import { Chat, User } from "grammy_types"; export function formatUserChat( - ctx: { from?: User; chat?: Chat; sdInstanceId?: string }, + ctx: { from?: User; chat?: Chat; workerInstanceKey?: string }, ) { const msg: string[] = []; if (ctx.from) { @@ -26,8 +26,8 @@ export function formatUserChat( } } } - if (ctx.sdInstanceId) { - msg.push(`using ${ctx.sdInstanceId}`); + if (ctx.workerInstanceKey) { + msg.push(`using ${ctx.workerInstanceKey}`); } return msg.join(" "); } diff --git a/utils/getAuthHeader.ts b/utils/getAuthHeader.ts new file mode 100644 index 0000000..b84eb96 --- /dev/null +++ b/utils/getAuthHeader.ts @@ -0,0 +1,4 @@ +export function getAuthHeader(auth: { user: string; password: string } | null) { + if (!auth) return {}; + return { "Authorization": `Basic ${btoa(`${auth.user}:${auth.password}`)}` }; +}