From fd0ecde7e55eb2dd378c1257ffe593e501a0623a Mon Sep 17 00:00:00 2001 From: Eloy Date: Sun, 19 May 2024 21:43:49 +0200 Subject: [PATCH] Various improvements * Add IRC support * Move Telegram code to specific file * Update requirements * Disable power saving mode of the printer * Add group check * Add argument parsing --- bot.py | 147 ++++++++++++++++------------------------------- bot_irc.py | 87 ++++++++++++++++++++++++++++ bot_telegram.py | 113 ++++++++++++++++++++++++++++++++++++ requirements.txt | 3 + 4 files changed, 252 insertions(+), 98 deletions(-) create mode 100644 bot_irc.py create mode 100644 bot_telegram.py diff --git a/bot.py b/bot.py index f28ec01..f864ccf 100644 --- a/bot.py +++ b/bot.py @@ -4,111 +4,62 @@ from telethon.tl.types import MessageMediaDocument, DocumentAttributeSticker, Do from telethon import events from telethon.tl.custom import Button from os.path import isfile, join +from grp import getgrgid +from os import getgroups from os import system +from os import path from time import time from PIL import Image from config import * from os import makedirs +import argparse -logging.basicConfig(level=logging.INFO) -client = TelegramClient('bot', API_ID, API_HASH).start(bot_token=BOT_TOKEN) -client.flood_sleep_threshold = 120 - -print_log = {} - -@client.on(events.NewMessage(pattern='^/id')) -async def debug_id(ev): - await ev.respond(f"Hello! Your id is `{ev.peer_id.user_id}` please add it to the ADMIN_ID to give youself privileges :)") - -@client.on(events.NewMessage(pattern='^/start')) -async def welcome(ev): - await ev.respond(WELCOME_MSG) - if (ev.peer_id.user_id not in print_log) and PASSWORD: - await ev.respond(UNLOCK_MSG) - -# This one triggers on a single message with the pin code written -@client.on(events.NewMessage(pattern=PASSWORD, func=lambda e: e.is_private)) -async def unlock_printer(ev): - if ev.peer_id.user_id not in print_log: - print_log[ev.peer_id.user_id] = 0 - if PASSWORD: - await ev.respond(UNLOCKED_MSG) - -@client.on(events.NewMessage(incoming=True, func=lambda e: e.is_private and e.message.media)) -async def handler(ev): - - msg = ev.message - if ev.peer_id.user_id not in print_log: - await ev.respond(UNLOCK_MSG) - - # Check if the file is valid - if msg.photo: - fn = join(CACHE_DIR, f"{msg.photo.id}.jpg") - elif msg.sticker: - fn = join(CACHE_DIR, f"{msg.sticker.id}.webp") - for att in msg.sticker.attributes: - if isinstance(att, DocumentAttributeAnimated): - fn = None - break - else: - fn = None - - if not fn: - await ev.respond(FORMAT_ERR_MSG) - return - - # Check if the user is still in the cooldown period - time_left = int((print_log[ev.peer_id.user_id] + BASE_COOLDOWN) - time()) - if time_left > 0: - await ev.respond(RATELIMIT_MSG.format(time_left=time_left)) - return - - # Download the file unless it's in the cache! - if not isfile(fn): - await client.download_media(msg, file=fn) - - # Try opening the image, at least - try: - img = Image.open(fn) - except: - await ev.respond(FORMAT_ERR_MSG) - return - - # Limit stickers ratio (so people don't print incredibly long stickers) - if img.size[1]/img.size[0] > MAX_ASPECT_RATIO: - await ev.respond(RATIO_ERR_MSG) - return - - # Remove transparency - if img.mode == 'RGBA': - bg_img = Image.new(img.mode, img.size, BACKGROUND_COLOR) - img = Image.alpha_composite(bg_img, img) - - # Resize the image - img.thumbnail([WIDTH_PX, HEIGHT_PX], resample=Image.LANCZOS, reducing_gap=None) - - # Convert to grayscale and apply a gamma of 1.8 - img = img.convert('L') - - if GAMMA_CORRECTION != 1: - img = Image.eval(img, lambda x: int(255*pow((x/255),(1/GAMMA_CORRECTION)))) - - img.save(IMAGE_PATH, 'PNG') - - await client.forward_messages(ADMIN_ID, ev.message) - - status_code = system(PRINT_COMMAND) - if status_code == 0: - print_log[ev.peer_id.user_id] = time() - await ev.respond(PRINT_SUCCESS_MSG) - else: - await ev.respond(PRINT_FAIL_MSG) - await client.send_message(ADMIN_ID, f'Printer is not working. Process returned status code {status_code}') - - if PRINT_SUCCESS_COMMAND: - system(PRINT_SUCCESS_COMMAND) if __name__ == '__main__': + parser = argparse.ArgumentParser( + prog='./bot.py', + description='Recieve images through IRC or Telegram and print the content with brother_ql' + ) + + parser.add_argument('-t', '--telegram', help='Enable Telegram (requires configuration)', + action='store_true') # on/off flag + parser.add_argument('-i', '--irc', help='Enable IRC (generate random channel and username by default, set values in the configuration if you want a static name)', + action='store_true') # on/off flag + + parser.add_argument('-p', '--printer', help='Printer devicefile, default is /dev/usb/lp0', + action='store_true') # on/off flag + + args = parser.parse_args() + + # os.getgroups() seems to return the *effective* groups of the current user, not + # what is in /etc/group https://docs.python.org/3/library/os.html#os.getgrouplist + # This is good thing, otherwise the error would disappear even if the user has not + # logged in again. + if "lp" not in [getgrgid(g).gr_name for g in getgroups()]: + print("""Error: User is not in the lp group. Run the following and sign in again: + sudo usermod -a -G lp $USER""") + + # Assumes this hardcoded path, prevent + # continuing if there is not even a printer + if not path.exists("/dev/usb/lp0"): + exit("There seems to be no printer connected.") + + # By default the printer turns off after a certain time to save power. + # For the use case of a public usable printer at events this is + # annoying. Currently, there is no function in the brother_ql project to + # disable power saving: https://github.com/pklaus/brother_ql/issues/50 + # The workaround shell command is rewritten to Python code. Please + # remove this code if brother_ql implements a working fix themselves + with open("/dev/usb/lp0", "wb") as p: + p.write(b'\x1b\x69\x55\x41\x00\x00') + + if args.irc: + import bot_irc + + if args.telegram: + import bot_telegram + + makedirs(CACHE_DIR, exist_ok=True) - client.run_until_disconnected() + client.run_until_disconnected() \ No newline at end of file diff --git a/bot_irc.py b/bot_irc.py new file mode 100644 index 0000000..c0352fb --- /dev/null +++ b/bot_irc.py @@ -0,0 +1,87 @@ +from config import * + +# On IRC, we have to use exclamation marks for commands, because otherwise they can overrule + +import irc.bot +import irc.strings +from irc.client import ip_numstr_to_quad, ip_quad_to_numstr +import os +import requests + +print(IRC_SERVER) + + +class TestBot(irc.bot.SingleServerIRCBot): + def __init__(self, channel, nickname, server, port=6667): + irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname) + self.channel = channel + + def on_nicknameinuse(self, c, e): + c.nick(c.get_nickname() + "_") + + def on_welcome(self, c, e): + c.join(self.channel) + + def on_privmsg(self, c, e): + self.do_command(e, e.arguments[0]) + + def on_pubmsg(self, c, e): + a = e.arguments[0].split(":", 1) + if len(a) > 1 and irc.strings.lower(a[0]) == irc.strings.lower( + self.connection.get_nickname() + ): + self.do_command(e, a[1].strip()) + return + + def on_dccmsg(self, c, e): + # non-chat DCC messages are raw bytes; decode as text + text = e.arguments[0].decode('utf-8') + c.privmsg("You said: " + text) + + def on_dccchat(self, c, e): + if len(e.arguments) != 2: + return + args = e.arguments[1].split() + if len(args) == 4: + try: + address = ip_numstr_to_quad(args[2]) + port = int(args[3]) + except ValueError: + return + self.dcc_connect(address, port) + + def do_command(self, e, cmd): + nick = e.source.nick + c = self.connection + """ + if cmd == "disconnect": + self.disconnect() + elif cmd == "die": + self.die() + elif cmd == "stats": + for chname, chobj in self.channels.items(): + c.notice(nick, "--- Channel statistics ---") + c.notice(nick, "Channel: " + chname) + users = sorted(chobj.users()) + c.notice(nick, "Users: " + ", ".join(users)) + opers = sorted(chobj.opers()) + c.notice(nick, "Opers: " + ", ".join(opers)) + voiced = sorted(chobj.voiced()) + c.notice(nick, "Voiced: " + ", ".join(voiced)) + elif cmd == "dcc": + dcc = self.dcc_listen() + c.ctcp( + "DCC", + nick, + f"CHAT chat {ip_quad_to_numstr(dcc.localaddress)} {dcc.localport}", + ) + """ + if cmd.startswith("print"): + r = requests.get(cmd.split(" ")[1:][0], allow_redirects=True) + open(IMAGE_PATH, 'wb').write(r.content) + os.system(PRINT_COMMAND) + else: + c.notice(nick, "Not understood: " + cmd) + +bot = TestBot("printer", "printerbottest", IRC_SERVER, IRC_PORT) +bot.start() \ No newline at end of file diff --git a/bot_telegram.py b/bot_telegram.py new file mode 100644 index 0000000..322568e --- /dev/null +++ b/bot_telegram.py @@ -0,0 +1,113 @@ +from telethon import TelegramClient +import logging, asyncio +from telethon.tl.types import MessageMediaDocument, DocumentAttributeSticker, DocumentAttributeAnimated, MessageMediaPhoto +from telethon import events +from telethon.tl.custom import Button +from os.path import isfile, join +from grp import getgrgid +from os import getgroups +from os import system +from time import time +from PIL import Image +from config import * +from os import makedirs + +logging.basicConfig(level=logging.INFO) + +client = TelegramClient('bot', API_ID, API_HASH).start(bot_token=BOT_TOKEN) +client.flood_sleep_threshold = 120 + +print_log = {} + + +@client.on(events.NewMessage(pattern='^/id')) +async def debug_id(ev): + await ev.respond(f"Hello! Your id is `{ev.peer_id.user_id}` please add it to the ADMIN_ID to give youself privileges :)") + +@client.on(events.NewMessage(pattern='^/start')) +async def welcome(ev): + await ev.respond(WELCOME_MSG) + if (ev.peer_id.user_id not in print_log) and PASSWORD: + await ev.respond(UNLOCK_MSG) + +# This one triggers on a single message with the pin code written +@client.on(events.NewMessage(pattern=PASSWORD, func=lambda e: e.is_private)) +async def unlock_printer(ev): + if ev.peer_id.user_id not in print_log: + print_log[ev.peer_id.user_id] = 0 + if PASSWORD: + await ev.respond(UNLOCKED_MSG) + +@client.on(events.NewMessage(incoming=True, func=lambda e: e.is_private and e.message.media)) +async def handler(ev): + + msg = ev.message + if ev.peer_id.user_id not in print_log: + await ev.respond(UNLOCK_MSG) + + # Check if the file is valid + if msg.photo: + fn = join(CACHE_DIR, f"{msg.photo.id}.jpg") + elif msg.sticker: + fn = join(CACHE_DIR, f"{msg.sticker.id}.webp") + for att in msg.sticker.attributes: + if isinstance(att, DocumentAttributeAnimated): + fn = None + break + else: + fn = None + + if not fn: + await ev.respond(FORMAT_ERR_MSG) + return + + # Check if the user is still in the cooldown period + time_left = int((print_log[ev.peer_id.user_id] + BASE_COOLDOWN) - time()) + if time_left > 0: + await ev.respond(RATELIMIT_MSG.format(time_left=time_left)) + return + + # Download the file unless it's in the cache! + if not isfile(fn): + await client.download_media(msg, file=fn) + + # Try opening the image, at least + try: + img = Image.open(fn) + except: + await ev.respond(FORMAT_ERR_MSG) + return + + # Limit stickers ratio (so people don't print incredibly long stickers) + if img.size[1]/img.size[0] > MAX_ASPECT_RATIO: + await ev.respond(RATIO_ERR_MSG) + return + + # Remove transparency + if img.mode == 'RGBA': + bg_img = Image.new(img.mode, img.size, BACKGROUND_COLOR) + img = Image.alpha_composite(bg_img, img) + + # Resize the image + img.thumbnail([WIDTH_PX, HEIGHT_PX], resample=Image.LANCZOS, reducing_gap=None) + + # Convert to grayscale and apply a gamma of 1.8 + img = img.convert('L') + + if GAMMA_CORRECTION != 1: + img = Image.eval(img, lambda x: int(255*pow((x/255),(1/GAMMA_CORRECTION)))) + + img.save(IMAGE_PATH, 'PNG') + + await client.forward_messages(ADMIN_ID, ev.message) + + status_code = system(PRINT_COMMAND) + if status_code == 0: + print_log[ev.peer_id.user_id] = time() + await ev.respond(PRINT_SUCCESS_MSG) + else: + await ev.respond(PRINT_FAIL_MSG) + await client.send_message(ADMIN_ID, f'Printer is not working. Process returned status code {status_code}') + + if PRINT_SUCCESS_COMMAND: + system(PRINT_SUCCESS_COMMAND) diff --git a/requirements.txt b/requirements.txt index 05ddc68..f854edf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ telethon Pillow brother_ql +irc +requests +argparse \ No newline at end of file