215 lines
7.9 KiB
Python
215 lines
7.9 KiB
Python
from telethon import events
|
|
from telethon.utils import get_display_name
|
|
from telethon.events import StopPropagation, CallbackQuery
|
|
from telethon.tl.custom import Button
|
|
from telethon.errors import MessageNotModifiedError
|
|
import logging
|
|
import httpx
|
|
import json
|
|
from random import randint
|
|
from sqlite3 import IntegrityError
|
|
from config import *
|
|
from parameters import *
|
|
from hashlib import md5
|
|
|
|
log = logging.getLogger('prompt')
|
|
|
|
def get_prompt(user_id, delete=True):
|
|
prompt = conn.execute('SELECT payload FROM pending_prompt WHERE user_id = ?', (user_id,)).fetchone()
|
|
|
|
if prompt:
|
|
prompt = json.loads(prompt[0])
|
|
else:
|
|
return None
|
|
|
|
if prompt['prompt']: return prompt
|
|
|
|
if delete:
|
|
conn.execute('DELETE FROM pending_prompt WHERE user_id = ?', (user_id,))
|
|
return None
|
|
else:
|
|
return prompt
|
|
|
|
@events.register(events.callbackquery.CallbackQuery(pattern=r'^new_prompt'))
|
|
@events.register(events.NewMessage(pattern='^/new', incoming=True, func=lambda e: e.is_private))
|
|
async def new_prompt(ev):
|
|
|
|
prompt = get_prompt(ev.input_sender.user_id)
|
|
if prompt:
|
|
await ev.respond("You already have a pending prompt!",
|
|
buttons=[[Button.inline(f"Delete", f"delete_prompt"), Button.inline(f"Edit", "edit_prompt")]])
|
|
raise StopPropagation
|
|
|
|
req_msg = await ev.respond("A new prompt! What would you like to generate?\nSend a message with a phrase or a list of tags you would like to generate. For ideas, check @ai621gen or ask in our chat @ai621chat.")
|
|
conn.execute('INSERT INTO pending_prompt(user_id, payload) VALUES (?,?)', (ev.input_sender.user_id, json.dumps(fields_template)))
|
|
conn.execute('UPDATE user SET pending = \'prompt\', pending_msg = ? WHERE id = ?', (req_msg.id, ev.input_sender.user_id))
|
|
print('Has been created')
|
|
raise StopPropagation
|
|
|
|
@events.register(events.NewMessage(pattern='^/delete', incoming=True, func=lambda e: e.is_private))
|
|
@events.register(events.callbackquery.CallbackQuery(pattern=r'^delete_prompt'))
|
|
async def delete_prompt(ev):
|
|
conn.execute('DELETE FROM pending_prompt WHERE user_id = ?', (ev.input_sender.user_id,))
|
|
await ev.respond('Your pending prompt, if any, has been deleted.',
|
|
buttons=[[Button.inline(f"New prompt", f"new_prompt")]])
|
|
raise StopPropagation
|
|
|
|
@events.register(events.callbackquery.CallbackQuery(pattern=r'^msg_but:'))
|
|
@events.register(events.NewMessage(incoming=True, pattern='^([^\/](\n|.)*)?$', func=lambda e: e.is_private))
|
|
async def accept_data(ev):
|
|
pending = conn.execute('SELECT pending FROM user WHERE id = ? AND pending IS NOT NULL', (ev.input_sender.user_id,)).fetchone()
|
|
if not pending:
|
|
await ev.respond("I don't understand. Maybe try to /start again?")
|
|
raise StopPropagation
|
|
else:
|
|
pending = pending[0]
|
|
|
|
print(pending)
|
|
prompt = get_prompt(ev.input_sender.user_id, delete=False)
|
|
if not prompt:
|
|
await ev.respond('You have no pending prompt to edit. Try to /start again?')
|
|
raise StopPropagation
|
|
|
|
if hasattr(ev, 'message'):
|
|
data = ev.message.raw_text.strip()
|
|
else:
|
|
data = ev.data.decode().split(':', 1)[1]
|
|
|
|
await ev.respond(f'✅ You have set {pending} to {data}')
|
|
|
|
if pending == 'prompt':
|
|
prompt['prompt'] = data
|
|
|
|
conn.execute('UPDATE pending_prompt SET payload = ? WHERE user_id = ?', (json.dumps(prompt), ev.input_sender.user_id))
|
|
conn.execute('UPDATE user SET pending = NULL WHERE id = ?', (ev.input_sender.user_id,))
|
|
await edit_prompt(ev)
|
|
raise StopPropagation
|
|
|
|
@events.register(events.callbackquery.CallbackQuery(pattern=r'^(change|toggle):[a-z]+$'))
|
|
async def edit_parameter(ev):
|
|
|
|
prompt = get_prompt(ev.input_sender.user_id)
|
|
if not prompt:
|
|
await ev.delete()
|
|
await ev.respond('You have no pending prompt to edit.', buttons=[[Button.inline(f"New prompt", f"new_prompt")]])
|
|
raise StopPropagation
|
|
|
|
action, parameter = ev.data.decode().split(':', 1)
|
|
|
|
if action == 'toggle':
|
|
if limits[parameter]['type'] == 'boolean':
|
|
prompt[parameter] = not prompt[parameter]
|
|
|
|
conn.execute('UPDATE pending_prompt SET payload = ? WHERE user_id = ?', (json.dumps(prompt), ev.input_sender.user_id))
|
|
await edit_prompt(ev)
|
|
|
|
if action == 'change':
|
|
btns = [Button.inline(str(x), 'msg_but:'+str(x)) for x in field_buttons.get(parameter, [])] or None
|
|
print(btns)
|
|
await ev.respond(prompt_msg[parameter], buttons=btns)
|
|
conn.execute('UPDATE user SET pending = ? WHERE id = ?', (parameter, ev.input_sender.user_id))
|
|
|
|
|
|
@events.register(events.callbackquery.CallbackQuery(pattern=r'^edit_prompt'))
|
|
@events.register(events.NewMessage(pattern='^/edit', incoming=True, func=lambda e: e.is_private))
|
|
async def edit_prompt(ev):
|
|
prompt = get_prompt(ev.input_sender.user_id)
|
|
|
|
if not prompt:
|
|
await ev.delete()
|
|
await ev.respond('You have no pending prompt to edit.', buttons=[[Button.inline(f"New prompt", f"new_prompt")]])
|
|
raise StopPropagation
|
|
|
|
msg_id = conn.execute('SELECT msg_id FROM pending_prompt WHERE user_id = ?', (ev.input_sender.user_id,)).fetchone()[0]
|
|
|
|
message = (f"<strong>Your current prompt.</strong>\n\n"
|
|
f"📣 Prompt: {prompt['prompt']}\n"
|
|
f"🚫 Negative prompt: {prompt['negative_prompt']}\n\n"
|
|
|
|
f"<strong>Generation parameters:</strong>\n"
|
|
f"🎛 Model: {prompt['models']}\n"
|
|
f"🌱 Seed: {prompt['seed']}\n"
|
|
f"♨️ Sampler: {prompt['sampler_name']}\n"
|
|
f"💎 Config scale: {prompt['cfg_scale']}\n"
|
|
f"🌀 Steps: {prompt['steps']}\n\n"
|
|
|
|
f"<strong>Output options:</strong>\n"
|
|
f"🖥 Resolution: {prompt['resolution']}\n"
|
|
f"✨ Upscaling: {prompt['use_upscaling']}\n"
|
|
f"🔢 Number of images: {prompt['n']}\n\n"
|
|
|
|
+ (
|
|
(
|
|
f"<strong>img2img options:</strong>\n"
|
|
f"📷 Base image: {'uploaded image' if prompt['image'] else 'no image'}\n",
|
|
) if prompt['image'] else ''
|
|
))
|
|
|
|
kwargs = {
|
|
'parse_mode': 'html',
|
|
'link_preview': False,
|
|
'buttons': [
|
|
[
|
|
Button.inline(f"👀 Prompt", f"change:prompt"),
|
|
Button.inline(f"⛔ Negative prompt", f"change:negative_prompt"),
|
|
],
|
|
[
|
|
Button.inline(f"🌱 Seed", f"change:seed"),
|
|
Button.inline(f"🔢 Number", f"change:n"),
|
|
Button.inline(f"✨ Upscale", f"toggle:use_upscaling")
|
|
],
|
|
[
|
|
Button.inline(f"💎 Config scale", f"change:cfg_scale"),
|
|
Button.inline(f"🌀 Steps", f"change:steps"),
|
|
],
|
|
[
|
|
# Button.inline(f"📷 Base image", f"change:image"),
|
|
Button.inline(f"🖥 Resolution", f"change:resolution"),
|
|
],
|
|
[
|
|
Button.inline(f"✅ Confirm", f"confirm_prompt"),
|
|
Button.inline(f"🕵️ Analyze", f"analyze_prompt"),
|
|
Button.inline(f"❌ Delete", f"delete_prompt")
|
|
]
|
|
]
|
|
}
|
|
|
|
if msg_id:
|
|
try:
|
|
await client.edit_message(ev.input_sender, msg_id, message, **kwargs)
|
|
except MessageNotModifiedError:
|
|
return
|
|
except Exception as e:
|
|
log.error(str(e))
|
|
else:
|
|
return
|
|
|
|
log.error(f"There was an issue editing the message of pending prompt")
|
|
msg_id = await ev.respond(message, **kwargs)
|
|
conn.execute('UPDATE pending_prompt SET msg_id = ? WHERE user_id = ?', (msg_id.id, ev.input_sender.user_id))
|
|
|
|
@events.register(events.callbackquery.CallbackQuery(pattern=r'^confirm_prompt'))
|
|
@events.register(events.NewMessage(pattern='^/confirm', incoming=True, func=lambda e: e.is_private))
|
|
async def confirm_prompt(ev):
|
|
prompt = get_prompt(ev.input_sender.user_id)
|
|
if not prompt:
|
|
await ev.respond('You have no prompt to confirm!', buttons=[[Button.inline(f"New prompt", f"new_prompt")]])
|
|
raise StopPropagation
|
|
|
|
json_prompt = prompt_template.copy()
|
|
json_prompt['prompt'] = prompt['prompt']
|
|
json_prompt['params']['width'], json_prompt['params']['height'] = prompt['resolution'].split('x')
|
|
if prompt['seed'] == 'random': prompt['seed'] = randint(0, 1000000)
|
|
|
|
if prompt['negative_prompt']:
|
|
json_prompt['prompt'].append('### ' + prompt['negative_prompt'])
|
|
|
|
for n in ['sampler_name', 'cfg_scale', 'seed', 'use_upscaling', 'steps', 'n', 'models']:
|
|
json_prompt['params'][n] = prompt[n]
|
|
|
|
conn.execute('INSERT INTO prompt(user_id, original_payload, payload, hash) VALUES (?,?,?,?)',
|
|
(ev.input_sender.user_id, json.dumps(prompt), json.dumps(json_prompt), md5(json.dumps(json_prompt, sort_keys=True)).hexdigest())
|
|
)
|
|
|
|
handler = [new_prompt, accept_data, delete_prompt, confirm_prompt, edit_prompt, edit_parameter]
|