1
0
Fork 0
printer_bot/bot_telegram.py

114 lines
3.3 KiB
Python

from telethon import TelegramClient
import logging, asyncio
from telethon.tl.types import MessageMediaDocument, DocumentAttributeSticker, DocumentAttributeAnimated, MessageMediaPhoto
from telethon import events
from telethon.tl.custom import Button
from os.path import isfile, join
from grp import getgrgid
from os import getgroups
from os import system
from time import time
from PIL import Image
from config import *
from os import makedirs
logging.basicConfig(level=logging.INFO)
client = TelegramClient('bot', API_ID, API_HASH).start(bot_token=BOT_TOKEN)
client.flood_sleep_threshold = 120
print_log = {}
@client.on(events.NewMessage(pattern='^/id'))
async def debug_id(ev):
await ev.respond(f"Hello! Your id is `{ev.peer_id.user_id}` please add it to the ADMIN_ID to give youself privileges :)")
@client.on(events.NewMessage(pattern='^/start'))
async def welcome(ev):
await ev.respond(WELCOME_MSG)
if (ev.peer_id.user_id not in print_log) and PASSWORD:
await ev.respond(UNLOCK_MSG)
# This one triggers on a single message with the pin code written
@client.on(events.NewMessage(pattern=PASSWORD, func=lambda e: e.is_private))
async def unlock_printer(ev):
if ev.peer_id.user_id not in print_log:
print_log[ev.peer_id.user_id] = 0
if PASSWORD:
await ev.respond(UNLOCKED_MSG)
@client.on(events.NewMessage(incoming=True, func=lambda e: e.is_private and e.message.media))
async def handler(ev):
msg = ev.message
if ev.peer_id.user_id not in print_log:
await ev.respond(UNLOCK_MSG)
# Check if the file is valid
if msg.photo:
fn = join(CACHE_DIR, f"{msg.photo.id}.jpg")
elif msg.sticker:
fn = join(CACHE_DIR, f"{msg.sticker.id}.webp")
for att in msg.sticker.attributes:
if isinstance(att, DocumentAttributeAnimated):
fn = None
break
else:
fn = None
if not fn:
await ev.respond(FORMAT_ERR_MSG)
return
# Check if the user is still in the cooldown period
time_left = int((print_log[ev.peer_id.user_id] + BASE_COOLDOWN) - time())
if time_left > 0:
await ev.respond(RATELIMIT_MSG.format(time_left=time_left))
return
# Download the file unless it's in the cache!
if not isfile(fn):
await client.download_media(msg, file=fn)
# Try opening the image, at least
try:
img = Image.open(fn)
except:
await ev.respond(FORMAT_ERR_MSG)
return
# Limit stickers ratio (so people don't print incredibly long stickers)
if img.size[1]/img.size[0] > MAX_ASPECT_RATIO:
await ev.respond(RATIO_ERR_MSG)
return
# Remove transparency
if img.mode == 'RGBA':
bg_img = Image.new(img.mode, img.size, BACKGROUND_COLOR)
img = Image.alpha_composite(bg_img, img)
# Resize the image
img.thumbnail([WIDTH_PX, HEIGHT_PX], resample=Image.LANCZOS, reducing_gap=None)
# Convert to grayscale and apply a gamma of 1.8
img = img.convert('L')
if GAMMA_CORRECTION != 1:
img = Image.eval(img, lambda x: int(255*pow((x/255),(1/GAMMA_CORRECTION))))
img.save(IMAGE_PATH, 'PNG')
await client.forward_messages(ADMIN_ID, ev.message)
status_code = system(PRINT_COMMAND)
if status_code == 0:
print_log[ev.peer_id.user_id] = time()
await ev.respond(PRINT_SUCCESS_MSG)
else:
await ev.respond(PRINT_FAIL_MSG)
await client.send_message(ADMIN_ID, f'Printer is not working. Process returned status code {status_code}')
if PRINT_SUCCESS_COMMAND:
system(PRINT_SUCCESS_COMMAND)