112 lines
3.2 KiB
Python
112 lines
3.2 KiB
Python
|
from telethon import TelegramClient
|
||
|
import logging, asyncio
|
||
|
from telethon.tl.types import MessageMediaDocument, DocumentAttributeSticker, DocumentAttributeAnimated, MessageMediaPhoto
|
||
|
from telethon import events
|
||
|
from telethon.tl.custom import Button
|
||
|
from os.path import isfile, join
|
||
|
from os import system
|
||
|
from time import time
|
||
|
from PIL import Image
|
||
|
from config import *
|
||
|
from os import makedirs
|
||
|
|
||
|
logging.basicConfig(level=logging.INFO)
|
||
|
|
||
|
client = TelegramClient('bot', API_ID, API_HASH).start(bot_token=BOT_TOKEN)
|
||
|
client.flood_sleep_threshold = 120
|
||
|
|
||
|
print_log = {}
|
||
|
|
||
|
@client.on(events.NewMessage(pattern='^/id'))
|
||
|
async def debug_id(ev):
|
||
|
await ev.respond(f"Hello! Your id is `{ev.peer_id.user_id}` please add it to the ADMIN_ID to give youself privileges :)")
|
||
|
|
||
|
@client.on(events.NewMessage(pattern='^/start'))
|
||
|
async def welcome(ev):
|
||
|
await ev.respond(WELCOME_MSG)
|
||
|
if (ev.peer_id.user_id not in print_log) and PASSWORD:
|
||
|
await ev.respond(UNLOCK_MSG)
|
||
|
|
||
|
# This one triggers on a single message with the pin code written
|
||
|
@client.on(events.NewMessage(pattern=PASSWORD, func=lambda e: e.is_private))
|
||
|
async def unlock_printer(ev):
|
||
|
if ev.peer_id.user_id not in print_log:
|
||
|
print_log[ev.peer_id.user_id] = 0
|
||
|
if PASSWORD:
|
||
|
await ev.respond(UNLOCKED_MSG)
|
||
|
|
||
|
@client.on(events.NewMessage(incoming=True, func=lambda e: e.is_private and e.message.media))
|
||
|
async def handler(ev):
|
||
|
|
||
|
msg = ev.message
|
||
|
if ev.peer_id.user_id not in print_log:
|
||
|
await ev.respond(UNLOCK_MSG)
|
||
|
|
||
|
# Check if the file is valid
|
||
|
if msg.photo:
|
||
|
fn = join(CACHE_DIR, f"{msg.photo.id}.jpg")
|
||
|
elif msg.sticker:
|
||
|
fn = join(CACHE_DIR, f"{msg.sticker.id}.webp")
|
||
|
for att in msg.sticker.attributes:
|
||
|
if isinstance(att, DocumentAttributeAnimated):
|
||
|
fn = None
|
||
|
break
|
||
|
else:
|
||
|
fn = None
|
||
|
|
||
|
if not fn:
|
||
|
await ev.respond(FORMAT_ERR_MSG)
|
||
|
return
|
||
|
|
||
|
# Check if the user is still in the cooldown period
|
||
|
time_left = int((print_log[ev.peer_id.user_id] + BASE_COOLDOWN) - time())
|
||
|
if time_left > 0:
|
||
|
await ev.respond(RATELIMIT_MSG.format(time_left=time_left))
|
||
|
return
|
||
|
|
||
|
# Download the file unless it's in the cache!
|
||
|
if not isfile(fn):
|
||
|
await client.download_media(msg, file=fn)
|
||
|
|
||
|
# Try opening the image, at least
|
||
|
try:
|
||
|
img = Image.open(fn)
|
||
|
except:
|
||
|
await ev.respond(FORMAT_ERR_MSG)
|
||
|
return
|
||
|
|
||
|
# Limit stickers ratio (so people don't print incredibly long stickers)
|
||
|
if img.size[1]/img.size[0] > MAX_ASPECT_RATIO:
|
||
|
return ev.respond(RATIO_ERR_MSG)
|
||
|
|
||
|
# Remove transparency
|
||
|
if img.mode == 'RGBA':
|
||
|
bg_img = Image.new(img.mode, img.size, BACKGROUND_COLOR)
|
||
|
img = Image.alpha_composite(bg_img, img)
|
||
|
|
||
|
# Resize the image
|
||
|
img.thumbnail([WIDTH_PX, HEIGHT_PX], resample=Image.LANCZOS, reducing_gap=None)
|
||
|
|
||
|
# Convert to grayscale and apply a gamma of 1.8
|
||
|
img = img.convert('L')
|
||
|
|
||
|
if GAMMA_CORRECTION != 1:
|
||
|
img = Image.eval(img, lambda x: int(255*pow((x/255),(1/GAMMA_CORRECTION))))
|
||
|
|
||
|
img.save(IMAGE_PATH, 'PNG')
|
||
|
|
||
|
status_code = system(PRINT_COMMAND)
|
||
|
if status_code == 0:
|
||
|
print_log[ev.peer_id.user_id] = time()
|
||
|
await ev.respond(PRINT_SUCCESS_MSG)
|
||
|
else:
|
||
|
await ev.respond(PRINT_FAIL_MSG)
|
||
|
await client.send_message(ADMIN_ID, f'Printer is not working. Process returned status code {status_code}')
|
||
|
|
||
|
if PRINT_SUCCESS_COMMAND:
|
||
|
system(PRINT_SUCCESS_COMMAND)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
makedirs(CACHE_DIR, exist_ok=True)
|
||
|
client.run_until_disconnected()
|