forked from foxo/printer_bot
Various improvements
* Add IRC support * Move Telegram code to specific file * Update requirements * Disable power saving mode of the printer * Add printer existence+permission check (fixes #2) * Add argument parsing
This commit is contained in:
parent
a6c4de8e2c
commit
d911efb69d
|
@ -1,6 +1,6 @@
|
|||
# Telegram Sticker Printer Bot
|
||||
# Sticker Printer Bot
|
||||
|
||||
This Python script implements a Telegram bot that can print images and stickers sent by users. The bot supports resizing images, converting them to grayscale, and applying gamma correction before printing to ensure maximum quality.
|
||||
This Python script implements a Telegram and IRC bot that can print images and stickers sent by users. The bot supports resizing images, converting them to grayscale, and applying gamma correction before printing to ensure maximum quality.
|
||||
|
||||
Currently, you can set any command to print your sticker (by default brother_ql is used to print). You can use any external program you want to print to other brands and models of printers.
|
||||
|
||||
|
|
147
bot.py
147
bot.py
|
@ -4,111 +4,62 @@ from telethon.tl.types import MessageMediaDocument, DocumentAttributeSticker, Do
|
|||
from telethon import events
|
||||
from telethon.tl.custom import Button
|
||||
from os.path import isfile, join
|
||||
from grp import getgrgid
|
||||
from os import getgroups
|
||||
from os import system
|
||||
from os import path
|
||||
from time import time
|
||||
from PIL import Image
|
||||
from config import *
|
||||
from os import makedirs
|
||||
import argparse
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
client = TelegramClient('bot', API_ID, API_HASH).start(bot_token=BOT_TOKEN)
|
||||
client.flood_sleep_threshold = 120
|
||||
|
||||
print_log = {}
|
||||
|
||||
@client.on(events.NewMessage(pattern='^/id'))
|
||||
async def debug_id(ev):
|
||||
await ev.respond(f"Hello! Your id is `{ev.peer_id.user_id}` please add it to the ADMIN_ID to give youself privileges :)")
|
||||
|
||||
@client.on(events.NewMessage(pattern='^/start'))
|
||||
async def welcome(ev):
|
||||
await ev.respond(WELCOME_MSG)
|
||||
if (ev.peer_id.user_id not in print_log) and PASSWORD:
|
||||
await ev.respond(UNLOCK_MSG)
|
||||
|
||||
# This one triggers on a single message with the pin code written
|
||||
@client.on(events.NewMessage(pattern=PASSWORD, func=lambda e: e.is_private))
|
||||
async def unlock_printer(ev):
|
||||
if ev.peer_id.user_id not in print_log:
|
||||
print_log[ev.peer_id.user_id] = 0
|
||||
if PASSWORD:
|
||||
await ev.respond(UNLOCKED_MSG)
|
||||
|
||||
@client.on(events.NewMessage(incoming=True, func=lambda e: e.is_private and e.message.media))
|
||||
async def handler(ev):
|
||||
|
||||
msg = ev.message
|
||||
if ev.peer_id.user_id not in print_log:
|
||||
await ev.respond(UNLOCK_MSG)
|
||||
|
||||
# Check if the file is valid
|
||||
if msg.photo:
|
||||
fn = join(CACHE_DIR, f"{msg.photo.id}.jpg")
|
||||
elif msg.sticker:
|
||||
fn = join(CACHE_DIR, f"{msg.sticker.id}.webp")
|
||||
for att in msg.sticker.attributes:
|
||||
if isinstance(att, DocumentAttributeAnimated):
|
||||
fn = None
|
||||
break
|
||||
else:
|
||||
fn = None
|
||||
|
||||
if not fn:
|
||||
await ev.respond(FORMAT_ERR_MSG)
|
||||
return
|
||||
|
||||
# Check if the user is still in the cooldown period
|
||||
time_left = int((print_log[ev.peer_id.user_id] + BASE_COOLDOWN) - time())
|
||||
if time_left > 0:
|
||||
await ev.respond(RATELIMIT_MSG.format(time_left=time_left))
|
||||
return
|
||||
|
||||
# Download the file unless it's in the cache!
|
||||
if not isfile(fn):
|
||||
await client.download_media(msg, file=fn)
|
||||
|
||||
# Try opening the image, at least
|
||||
try:
|
||||
img = Image.open(fn)
|
||||
except:
|
||||
await ev.respond(FORMAT_ERR_MSG)
|
||||
return
|
||||
|
||||
# Limit stickers ratio (so people don't print incredibly long stickers)
|
||||
if img.size[1]/img.size[0] > MAX_ASPECT_RATIO:
|
||||
await ev.respond(RATIO_ERR_MSG)
|
||||
return
|
||||
|
||||
# Remove transparency
|
||||
if img.mode == 'RGBA':
|
||||
bg_img = Image.new(img.mode, img.size, BACKGROUND_COLOR)
|
||||
img = Image.alpha_composite(bg_img, img)
|
||||
|
||||
# Resize the image
|
||||
img.thumbnail([WIDTH_PX, HEIGHT_PX], resample=Image.LANCZOS, reducing_gap=None)
|
||||
|
||||
# Convert to grayscale and apply a gamma of 1.8
|
||||
img = img.convert('L')
|
||||
|
||||
if GAMMA_CORRECTION != 1:
|
||||
img = Image.eval(img, lambda x: int(255*pow((x/255),(1/GAMMA_CORRECTION))))
|
||||
|
||||
img.save(IMAGE_PATH, 'PNG')
|
||||
|
||||
await client.forward_messages(ADMIN_ID, ev.message)
|
||||
|
||||
status_code = system(PRINT_COMMAND)
|
||||
if status_code == 0:
|
||||
print_log[ev.peer_id.user_id] = time()
|
||||
await ev.respond(PRINT_SUCCESS_MSG)
|
||||
else:
|
||||
await ev.respond(PRINT_FAIL_MSG)
|
||||
await client.send_message(ADMIN_ID, f'Printer is not working. Process returned status code {status_code}')
|
||||
|
||||
if PRINT_SUCCESS_COMMAND:
|
||||
system(PRINT_SUCCESS_COMMAND)
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='./bot.py',
|
||||
description='Recieve images through IRC or Telegram and print the content with brother_ql'
|
||||
)
|
||||
|
||||
parser.add_argument('-t', '--telegram', help='Enable Telegram (requires configuration)',
|
||||
action='store_true') # on/off flag
|
||||
parser.add_argument('-i', '--irc', help='Enable IRC (generate random channel and username by default, set values in the configuration if you want a static name)',
|
||||
action='store_true') # on/off flag
|
||||
|
||||
parser.add_argument('-p', '--printer', help='Printer devicefile, default is /dev/usb/lp0',
|
||||
action='store_true') # on/off flag
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# os.getgroups() seems to return the *effective* groups of the current user, not
|
||||
# what is in /etc/group https://docs.python.org/3/library/os.html#os.getgrouplist
|
||||
# This is good thing, otherwise the error would disappear even if the user has not
|
||||
# logged in again.
|
||||
if "lp" not in [getgrgid(g).gr_name for g in getgroups()]:
|
||||
print("""Error: User is not in the lp group. Run the following and sign in again:
|
||||
sudo usermod -a -G lp $USER""")
|
||||
|
||||
# Assumes this hardcoded path, prevent
|
||||
# continuing if there is not even a printer
|
||||
if not path.exists("/dev/usb/lp0"):
|
||||
exit("There seems to be no printer connected.")
|
||||
|
||||
# By default the printer turns off after a certain time to save power.
|
||||
# For the use case of a public usable printer at events this is
|
||||
# annoying. Currently, there is no function in the brother_ql project to
|
||||
# disable power saving: https://github.com/pklaus/brother_ql/issues/50
|
||||
# The workaround shell command is rewritten to Python code. Please
|
||||
# remove this code if brother_ql implements a working fix themselves
|
||||
with open("/dev/usb/lp0", "wb") as p:
|
||||
p.write(b'\x1b\x69\x55\x41\x00\x00')
|
||||
|
||||
if args.irc:
|
||||
import bot_irc
|
||||
|
||||
if args.telegram:
|
||||
import bot_telegram
|
||||
|
||||
|
||||
makedirs(CACHE_DIR, exist_ok=True)
|
||||
client.run_until_disconnected()
|
||||
client.run_until_disconnected()
|
|
@ -0,0 +1,87 @@
|
|||
from config import *
|
||||
|
||||
# On IRC, we have to use exclamation marks for commands, because otherwise they can overrule
|
||||
|
||||
import irc.bot
|
||||
import irc.strings
|
||||
from irc.client import ip_numstr_to_quad, ip_quad_to_numstr
|
||||
import os
|
||||
import requests
|
||||
|
||||
print(IRC_SERVER)
|
||||
|
||||
|
||||
class TestBot(irc.bot.SingleServerIRCBot):
|
||||
def __init__(self, channel, nickname, server, port=6667):
|
||||
irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)
|
||||
self.channel = channel
|
||||
|
||||
def on_nicknameinuse(self, c, e):
|
||||
c.nick(c.get_nickname() + "_")
|
||||
|
||||
def on_welcome(self, c, e):
|
||||
c.join(self.channel)
|
||||
|
||||
def on_privmsg(self, c, e):
|
||||
self.do_command(e, e.arguments[0])
|
||||
|
||||
def on_pubmsg(self, c, e):
|
||||
a = e.arguments[0].split(":", 1)
|
||||
if len(a) > 1 and irc.strings.lower(a[0]) == irc.strings.lower(
|
||||
self.connection.get_nickname()
|
||||
):
|
||||
self.do_command(e, a[1].strip())
|
||||
return
|
||||
|
||||
def on_dccmsg(self, c, e):
|
||||
# non-chat DCC messages are raw bytes; decode as text
|
||||
text = e.arguments[0].decode('utf-8')
|
||||
c.privmsg("You said: " + text)
|
||||
|
||||
def on_dccchat(self, c, e):
|
||||
if len(e.arguments) != 2:
|
||||
return
|
||||
args = e.arguments[1].split()
|
||||
if len(args) == 4:
|
||||
try:
|
||||
address = ip_numstr_to_quad(args[2])
|
||||
port = int(args[3])
|
||||
except ValueError:
|
||||
return
|
||||
self.dcc_connect(address, port)
|
||||
|
||||
def do_command(self, e, cmd):
|
||||
nick = e.source.nick
|
||||
c = self.connection
|
||||
"""
|
||||
if cmd == "disconnect":
|
||||
self.disconnect()
|
||||
elif cmd == "die":
|
||||
self.die()
|
||||
elif cmd == "stats":
|
||||
for chname, chobj in self.channels.items():
|
||||
c.notice(nick, "--- Channel statistics ---")
|
||||
c.notice(nick, "Channel: " + chname)
|
||||
users = sorted(chobj.users())
|
||||
c.notice(nick, "Users: " + ", ".join(users))
|
||||
opers = sorted(chobj.opers())
|
||||
c.notice(nick, "Opers: " + ", ".join(opers))
|
||||
voiced = sorted(chobj.voiced())
|
||||
c.notice(nick, "Voiced: " + ", ".join(voiced))
|
||||
elif cmd == "dcc":
|
||||
dcc = self.dcc_listen()
|
||||
c.ctcp(
|
||||
"DCC",
|
||||
nick,
|
||||
f"CHAT chat {ip_quad_to_numstr(dcc.localaddress)} {dcc.localport}",
|
||||
)
|
||||
"""
|
||||
if cmd.startswith("print"):
|
||||
r = requests.get(cmd.split(" ")[1:][0], allow_redirects=True)
|
||||
open(IMAGE_PATH, 'wb').write(r.content)
|
||||
os.system(PRINT_COMMAND)
|
||||
else:
|
||||
c.notice(nick, "Not understood: " + cmd)
|
||||
|
||||
bot = TestBot("printer", "printerbottest", IRC_SERVER, IRC_PORT)
|
||||
bot.start()
|
|
@ -0,0 +1,113 @@
|
|||
from telethon import TelegramClient
|
||||
import logging, asyncio
|
||||
from telethon.tl.types import MessageMediaDocument, DocumentAttributeSticker, DocumentAttributeAnimated, MessageMediaPhoto
|
||||
from telethon import events
|
||||
from telethon.tl.custom import Button
|
||||
from os.path import isfile, join
|
||||
from grp import getgrgid
|
||||
from os import getgroups
|
||||
from os import system
|
||||
from time import time
|
||||
from PIL import Image
|
||||
from config import *
|
||||
from os import makedirs
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
client = TelegramClient('bot', API_ID, API_HASH).start(bot_token=BOT_TOKEN)
|
||||
client.flood_sleep_threshold = 120
|
||||
|
||||
print_log = {}
|
||||
|
||||
|
||||
@client.on(events.NewMessage(pattern='^/id'))
|
||||
async def debug_id(ev):
|
||||
await ev.respond(f"Hello! Your id is `{ev.peer_id.user_id}` please add it to the ADMIN_ID to give youself privileges :)")
|
||||
|
||||
@client.on(events.NewMessage(pattern='^/start'))
|
||||
async def welcome(ev):
|
||||
await ev.respond(WELCOME_MSG)
|
||||
if (ev.peer_id.user_id not in print_log) and PASSWORD:
|
||||
await ev.respond(UNLOCK_MSG)
|
||||
|
||||
# This one triggers on a single message with the pin code written
|
||||
@client.on(events.NewMessage(pattern=PASSWORD, func=lambda e: e.is_private))
|
||||
async def unlock_printer(ev):
|
||||
if ev.peer_id.user_id not in print_log:
|
||||
print_log[ev.peer_id.user_id] = 0
|
||||
if PASSWORD:
|
||||
await ev.respond(UNLOCKED_MSG)
|
||||
|
||||
@client.on(events.NewMessage(incoming=True, func=lambda e: e.is_private and e.message.media))
|
||||
async def handler(ev):
|
||||
|
||||
msg = ev.message
|
||||
if ev.peer_id.user_id not in print_log:
|
||||
await ev.respond(UNLOCK_MSG)
|
||||
|
||||
# Check if the file is valid
|
||||
if msg.photo:
|
||||
fn = join(CACHE_DIR, f"{msg.photo.id}.jpg")
|
||||
elif msg.sticker:
|
||||
fn = join(CACHE_DIR, f"{msg.sticker.id}.webp")
|
||||
for att in msg.sticker.attributes:
|
||||
if isinstance(att, DocumentAttributeAnimated):
|
||||
fn = None
|
||||
break
|
||||
else:
|
||||
fn = None
|
||||
|
||||
if not fn:
|
||||
await ev.respond(FORMAT_ERR_MSG)
|
||||
return
|
||||
|
||||
# Check if the user is still in the cooldown period
|
||||
time_left = int((print_log[ev.peer_id.user_id] + BASE_COOLDOWN) - time())
|
||||
if time_left > 0:
|
||||
await ev.respond(RATELIMIT_MSG.format(time_left=time_left))
|
||||
return
|
||||
|
||||
# Download the file unless it's in the cache!
|
||||
if not isfile(fn):
|
||||
await client.download_media(msg, file=fn)
|
||||
|
||||
# Try opening the image, at least
|
||||
try:
|
||||
img = Image.open(fn)
|
||||
except:
|
||||
await ev.respond(FORMAT_ERR_MSG)
|
||||
return
|
||||
|
||||
# Limit stickers ratio (so people don't print incredibly long stickers)
|
||||
if img.size[1]/img.size[0] > MAX_ASPECT_RATIO:
|
||||
await ev.respond(RATIO_ERR_MSG)
|
||||
return
|
||||
|
||||
# Remove transparency
|
||||
if img.mode == 'RGBA':
|
||||
bg_img = Image.new(img.mode, img.size, BACKGROUND_COLOR)
|
||||
img = Image.alpha_composite(bg_img, img)
|
||||
|
||||
# Resize the image
|
||||
img.thumbnail([WIDTH_PX, HEIGHT_PX], resample=Image.LANCZOS, reducing_gap=None)
|
||||
|
||||
# Convert to grayscale and apply a gamma of 1.8
|
||||
img = img.convert('L')
|
||||
|
||||
if GAMMA_CORRECTION != 1:
|
||||
img = Image.eval(img, lambda x: int(255*pow((x/255),(1/GAMMA_CORRECTION))))
|
||||
|
||||
img.save(IMAGE_PATH, 'PNG')
|
||||
|
||||
await client.forward_messages(ADMIN_ID, ev.message)
|
||||
|
||||
status_code = system(PRINT_COMMAND)
|
||||
if status_code == 0:
|
||||
print_log[ev.peer_id.user_id] = time()
|
||||
await ev.respond(PRINT_SUCCESS_MSG)
|
||||
else:
|
||||
await ev.respond(PRINT_FAIL_MSG)
|
||||
await client.send_message(ADMIN_ID, f'Printer is not working. Process returned status code {status_code}')
|
||||
|
||||
if PRINT_SUCCESS_COMMAND:
|
||||
system(PRINT_SUCCESS_COMMAND)
|
|
@ -1,3 +1,6 @@
|
|||
telethon
|
||||
Pillow
|
||||
brother_ql
|
||||
irc
|
||||
requests
|
||||
argparse
|
Loading…
Reference in New Issue