from telethon import events from telethon.utils import get_display_name from telethon.events import StopPropagation, CallbackQuery from telethon.tl.custom import Button from telethon.errors import MessageNotModifiedError import logging import httpx import json from random import randint from sqlite3 import IntegrityError from config import * from parameters import * from hashlib import md5 log = logging.getLogger('prompt') def get_prompt(user_id, delete=True): prompt = conn.execute('SELECT payload FROM pending_prompt WHERE user_id = ?', (user_id,)).fetchone() if prompt: prompt = json.loads(prompt[0]) else: return None if prompt['prompt']: return prompt if delete: conn.execute('DELETE FROM pending_prompt WHERE user_id = ?', (user_id,)) return None else: return prompt @events.register(events.callbackquery.CallbackQuery(pattern=r'^new_prompt')) @events.register(events.NewMessage(pattern='^/new', incoming=True, func=lambda e: e.is_private)) async def new_prompt(ev): prompt = get_prompt(ev.input_sender.user_id) if prompt: await ev.respond("You already have a pending prompt!", buttons=[[Button.inline(f"Delete", f"delete_prompt"), Button.inline(f"Edit", "edit_prompt")]]) raise StopPropagation req_msg = await ev.respond("A new prompt! What would you like to generate?\nSend a message with a phrase or a list of tags you would like to generate. For ideas, check @ai621gen or ask in our chat @ai621chat.") conn.execute('INSERT INTO pending_prompt(user_id, payload) VALUES (?,?)', (ev.input_sender.user_id, json.dumps(fields_template))) conn.execute('UPDATE user SET pending = \'prompt\', pending_msg = ? WHERE id = ?', (req_msg.id, ev.input_sender.user_id)) print('Has been created') raise StopPropagation @events.register(events.NewMessage(pattern='^/delete', incoming=True, func=lambda e: e.is_private)) @events.register(events.callbackquery.CallbackQuery(pattern=r'^delete_prompt')) async def delete_prompt(ev): conn.execute('DELETE FROM pending_prompt WHERE user_id = ?', (ev.input_sender.user_id,)) await ev.respond('Your pending prompt, if any, has been deleted.', buttons=[[Button.inline(f"New prompt", f"new_prompt")]]) raise StopPropagation @events.register(events.callbackquery.CallbackQuery(pattern=r'^msg_but:')) @events.register(events.NewMessage(incoming=True, pattern='^([^\/](\n|.)*)?$', func=lambda e: e.is_private)) async def accept_data(ev): pending = conn.execute('SELECT pending FROM user WHERE id = ? AND pending IS NOT NULL', (ev.input_sender.user_id,)).fetchone() if not pending: await ev.respond("I don't understand. Maybe try to /start again?") raise StopPropagation else: pending = pending[0] print(pending) prompt = get_prompt(ev.input_sender.user_id, delete=False) if not prompt: await ev.respond('You have no pending prompt to edit. Try to /start again?') raise StopPropagation if hasattr(ev, 'message'): data = ev.message.raw_text.strip() else: data = ev.data.decode().split(':', 1)[1] await ev.respond(f'✅ You have set {pending} to {data}') if pending == 'prompt': prompt['prompt'] = data conn.execute('UPDATE pending_prompt SET payload = ? WHERE user_id = ?', (json.dumps(prompt), ev.input_sender.user_id)) conn.execute('UPDATE user SET pending = NULL WHERE id = ?', (ev.input_sender.user_id,)) await edit_prompt(ev) raise StopPropagation @events.register(events.callbackquery.CallbackQuery(pattern=r'^(change|toggle):[a-z]+$')) async def edit_parameter(ev): prompt = get_prompt(ev.input_sender.user_id) if not prompt: await ev.delete() await ev.respond('You have no pending prompt to edit.', buttons=[[Button.inline(f"New prompt", f"new_prompt")]]) raise StopPropagation action, parameter = ev.data.decode().split(':', 1) if action == 'toggle': if limits[parameter]['type'] == 'boolean': prompt[parameter] = not prompt[parameter] conn.execute('UPDATE pending_prompt SET payload = ? WHERE user_id = ?', (json.dumps(prompt), ev.input_sender.user_id)) await edit_prompt(ev) if action == 'change': btns = [Button.inline(str(x), 'msg_but:'+str(x)) for x in field_buttons.get(parameter, [])] or None print(btns) await ev.respond(prompt_msg[parameter], buttons=btns) conn.execute('UPDATE user SET pending = ? WHERE id = ?', (parameter, ev.input_sender.user_id)) @events.register(events.callbackquery.CallbackQuery(pattern=r'^edit_prompt')) @events.register(events.NewMessage(pattern='^/edit', incoming=True, func=lambda e: e.is_private)) async def edit_prompt(ev): prompt = get_prompt(ev.input_sender.user_id) if not prompt: await ev.delete() await ev.respond('You have no pending prompt to edit.', buttons=[[Button.inline(f"New prompt", f"new_prompt")]]) raise StopPropagation msg_id = conn.execute('SELECT msg_id FROM pending_prompt WHERE user_id = ?', (ev.input_sender.user_id,)).fetchone()[0] message = (f"Your current prompt.\n\n" f"📣 Prompt: {prompt['prompt']}\n" f"🚫 Negative prompt: {prompt['negative_prompt']}\n\n" f"Generation parameters:\n" f"🎛 Model: {prompt['models']}\n" f"🌱 Seed: {prompt['seed']}\n" f"♨️ Sampler: {prompt['sampler_name']}\n" f"💎 Config scale: {prompt['cfg_scale']}\n" f"🌀 Steps: {prompt['steps']}\n\n" f"Output options:\n" f"🖥 Resolution: {prompt['resolution']}\n" f"✨ Upscaling: {prompt['use_upscaling']}\n" f"🔢 Number of images: {prompt['n']}\n\n" + ( ( f"img2img options:\n" f"📷 Base image: {'uploaded image' if prompt['image'] else 'no image'}\n", ) if prompt['image'] else '' )) kwargs = { 'parse_mode': 'html', 'link_preview': False, 'buttons': [ [ Button.inline(f"👀 Prompt", f"change:prompt"), Button.inline(f"⛔ Negative prompt", f"change:negative_prompt"), ], [ Button.inline(f"🌱 Seed", f"change:seed"), Button.inline(f"🔢 Number", f"change:n"), Button.inline(f"✨ Upscale", f"toggle:use_upscaling") ], [ Button.inline(f"💎 Config scale", f"change:cfg_scale"), Button.inline(f"🌀 Steps", f"change:steps"), ], [ # Button.inline(f"📷 Base image", f"change:image"), Button.inline(f"🖥 Resolution", f"change:resolution"), ], [ Button.inline(f"✅ Confirm", f"confirm_prompt"), Button.inline(f"🕵️ Analyze", f"analyze_prompt"), Button.inline(f"❌ Delete", f"delete_prompt") ] ] } if msg_id: try: await client.edit_message(ev.input_sender, msg_id, message, **kwargs) except MessageNotModifiedError: return except Exception as e: log.error(str(e)) else: return log.error(f"There was an issue editing the message of pending prompt") msg_id = await ev.respond(message, **kwargs) conn.execute('UPDATE pending_prompt SET msg_id = ? WHERE user_id = ?', (msg_id.id, ev.input_sender.user_id)) @events.register(events.callbackquery.CallbackQuery(pattern=r'^confirm_prompt')) @events.register(events.NewMessage(pattern='^/confirm', incoming=True, func=lambda e: e.is_private)) async def confirm_prompt(ev): prompt = get_prompt(ev.input_sender.user_id) if not prompt: await ev.respond('You have no prompt to confirm!', buttons=[[Button.inline(f"New prompt", f"new_prompt")]]) raise StopPropagation json_prompt = prompt_template.copy() json_prompt['prompt'] = prompt['prompt'] json_prompt['params']['width'], json_prompt['params']['height'] = prompt['resolution'].split('x') if prompt['seed'] == 'random': prompt['seed'] = randint(0, 1000000) if prompt['negative_prompt']: json_prompt['prompt'].append('### ' + prompt['negative_prompt']) for n in ['sampler_name', 'cfg_scale', 'seed', 'use_upscaling', 'steps', 'n', 'models']: json_prompt['params'][n] = prompt[n] conn.execute('INSERT INTO prompt(user_id, original_payload, payload, hash) VALUES (?,?,?,?)', (ev.input_sender.user_id, json.dumps(prompt), json.dumps(json_prompt), md5(json.dumps(json_prompt, sort_keys=True)).hexdigest()) ) handler = [new_prompt, accept_data, delete_prompt, confirm_prompt, edit_prompt, edit_parameter]