Compare commits

..

1 Commits
main ... main

Author SHA1 Message Date
Eloy d911efb69d Various improvements
* Add IRC support
* Move Telegram code to specific file
* Update requirements
* Disable power saving mode of the printer
* Add printer existence+permission check (fixes #2)
* Add argument parsing
2024-05-20 00:12:31 +02:00
5 changed files with 254 additions and 100 deletions

View File

@ -1,6 +1,6 @@
# Telegram Sticker Printer Bot # Sticker Printer Bot
This Python script implements a Telegram bot that can print images and stickers sent by users. The bot supports resizing images, converting them to grayscale, and applying gamma correction before printing to ensure maximum quality. This Python script implements a Telegram and IRC bot that can print images and stickers sent by users. The bot supports resizing images, converting them to grayscale, and applying gamma correction before printing to ensure maximum quality.
Currently, you can set any command to print your sticker (by default brother_ql is used to print). You can use any external program you want to print to other brands and models of printers. Currently, you can set any command to print your sticker (by default brother_ql is used to print). You can use any external program you want to print to other brands and models of printers.

147
bot.py
View File

@ -4,111 +4,62 @@ from telethon.tl.types import MessageMediaDocument, DocumentAttributeSticker, Do
from telethon import events from telethon import events
from telethon.tl.custom import Button from telethon.tl.custom import Button
from os.path import isfile, join from os.path import isfile, join
from grp import getgrgid
from os import getgroups
from os import system from os import system
from os import path
from time import time from time import time
from PIL import Image from PIL import Image
from config import * from config import *
from os import makedirs from os import makedirs
import argparse
logging.basicConfig(level=logging.INFO)
client = TelegramClient('bot', API_ID, API_HASH).start(bot_token=BOT_TOKEN)
client.flood_sleep_threshold = 120
print_log = {}
@client.on(events.NewMessage(pattern='^/id'))
async def debug_id(ev):
await ev.respond(f"Hello! Your id is `{ev.peer_id.user_id}` please add it to the ADMIN_ID to give youself privileges :)")
@client.on(events.NewMessage(pattern='^/start'))
async def welcome(ev):
await ev.respond(WELCOME_MSG)
if (ev.peer_id.user_id not in print_log) and PASSWORD:
await ev.respond(UNLOCK_MSG)
# This one triggers on a single message with the pin code written
@client.on(events.NewMessage(pattern=PASSWORD, func=lambda e: e.is_private))
async def unlock_printer(ev):
if ev.peer_id.user_id not in print_log:
print_log[ev.peer_id.user_id] = 0
if PASSWORD:
await ev.respond(UNLOCKED_MSG)
@client.on(events.NewMessage(incoming=True, func=lambda e: e.is_private and e.message.media))
async def handler(ev):
msg = ev.message
if ev.peer_id.user_id not in print_log:
await ev.respond(UNLOCK_MSG)
# Check if the file is valid
if msg.photo:
fn = join(CACHE_DIR, f"{msg.photo.id}.jpg")
elif msg.sticker:
fn = join(CACHE_DIR, f"{msg.sticker.id}.webp")
for att in msg.sticker.attributes:
if isinstance(att, DocumentAttributeAnimated):
fn = None
break
else:
fn = None
if not fn:
await ev.respond(FORMAT_ERR_MSG)
return
# Check if the user is still in the cooldown period
time_left = int((print_log[ev.peer_id.user_id] + BASE_COOLDOWN) - time())
if time_left > 0:
await ev.respond(RATELIMIT_MSG.format(time_left=time_left))
return
# Download the file unless it's in the cache!
if not isfile(fn):
await client.download_media(msg, file=fn)
# Try opening the image, at least
try:
img = Image.open(fn)
except:
await ev.respond(FORMAT_ERR_MSG)
return
# Limit stickers ratio (so people don't print incredibly long stickers)
if img.size[1]/img.size[0] > MAX_ASPECT_RATIO:
await ev.respond(RATIO_ERR_MSG)
return
# Remove transparency
if img.mode == 'RGBA':
bg_img = Image.new(img.mode, img.size, BACKGROUND_COLOR)
img = Image.alpha_composite(bg_img, img)
# Resize the image
img.thumbnail([WIDTH_PX, HEIGHT_PX], resample=Image.LANCZOS, reducing_gap=None)
# Convert to grayscale and apply a gamma of 1.8
img = img.convert('L')
if GAMMA_CORRECTION != 1:
img = Image.eval(img, lambda x: int(255*pow((x/255),(1/GAMMA_CORRECTION))))
img.save(IMAGE_PATH, 'PNG')
await client.forward_messages(ADMIN_ID, ev.message)
status_code = system(PRINT_COMMAND)
if status_code == 0:
print_log[ev.peer_id.user_id] = time()
await ev.respond(PRINT_SUCCESS_MSG)
else:
await ev.respond(PRINT_FAIL_MSG)
await client.send_message(ADMIN_ID, f'Printer is not working. Process returned status code {status_code}')
if PRINT_SUCCESS_COMMAND:
system(PRINT_SUCCESS_COMMAND)
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog='./bot.py',
description='Recieve images through IRC or Telegram and print the content with brother_ql'
)
parser.add_argument('-t', '--telegram', help='Enable Telegram (requires configuration)',
action='store_true') # on/off flag
parser.add_argument('-i', '--irc', help='Enable IRC (generate random channel and username by default, set values in the configuration if you want a static name)',
action='store_true') # on/off flag
parser.add_argument('-p', '--printer', help='Printer devicefile, default is /dev/usb/lp0',
action='store_true') # on/off flag
args = parser.parse_args()
# os.getgroups() seems to return the *effective* groups of the current user, not
# what is in /etc/group https://docs.python.org/3/library/os.html#os.getgrouplist
# This is good thing, otherwise the error would disappear even if the user has not
# logged in again.
if "lp" not in [getgrgid(g).gr_name for g in getgroups()]:
print("""Error: User is not in the lp group. Run the following and sign in again:
sudo usermod -a -G lp $USER""")
# Assumes this hardcoded path, prevent
# continuing if there is not even a printer
if not path.exists("/dev/usb/lp0"):
exit("There seems to be no printer connected.")
# By default the printer turns off after a certain time to save power.
# For the use case of a public usable printer at events this is
# annoying. Currently, there is no function in the brother_ql project to
# disable power saving: https://github.com/pklaus/brother_ql/issues/50
# The workaround shell command is rewritten to Python code. Please
# remove this code if brother_ql implements a working fix themselves
with open("/dev/usb/lp0", "wb") as p:
p.write(b'\x1b\x69\x55\x41\x00\x00')
if args.irc:
import bot_irc
if args.telegram:
import bot_telegram
makedirs(CACHE_DIR, exist_ok=True) makedirs(CACHE_DIR, exist_ok=True)
client.run_until_disconnected() client.run_until_disconnected()

87
bot_irc.py Normal file
View File

@ -0,0 +1,87 @@
from config import *
# On IRC, we have to use exclamation marks for commands, because otherwise they can overrule
import irc.bot
import irc.strings
from irc.client import ip_numstr_to_quad, ip_quad_to_numstr
import os
import requests
print(IRC_SERVER)
class TestBot(irc.bot.SingleServerIRCBot):
def __init__(self, channel, nickname, server, port=6667):
irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)
self.channel = channel
def on_nicknameinuse(self, c, e):
c.nick(c.get_nickname() + "_")
def on_welcome(self, c, e):
c.join(self.channel)
def on_privmsg(self, c, e):
self.do_command(e, e.arguments[0])
def on_pubmsg(self, c, e):
a = e.arguments[0].split(":", 1)
if len(a) > 1 and irc.strings.lower(a[0]) == irc.strings.lower(
self.connection.get_nickname()
):
self.do_command(e, a[1].strip())
return
def on_dccmsg(self, c, e):
# non-chat DCC messages are raw bytes; decode as text
text = e.arguments[0].decode('utf-8')
c.privmsg("You said: " + text)
def on_dccchat(self, c, e):
if len(e.arguments) != 2:
return
args = e.arguments[1].split()
if len(args) == 4:
try:
address = ip_numstr_to_quad(args[2])
port = int(args[3])
except ValueError:
return
self.dcc_connect(address, port)
def do_command(self, e, cmd):
nick = e.source.nick
c = self.connection
"""
if cmd == "disconnect":
self.disconnect()
elif cmd == "die":
self.die()
elif cmd == "stats":
for chname, chobj in self.channels.items():
c.notice(nick, "--- Channel statistics ---")
c.notice(nick, "Channel: " + chname)
users = sorted(chobj.users())
c.notice(nick, "Users: " + ", ".join(users))
opers = sorted(chobj.opers())
c.notice(nick, "Opers: " + ", ".join(opers))
voiced = sorted(chobj.voiced())
c.notice(nick, "Voiced: " + ", ".join(voiced))
elif cmd == "dcc":
dcc = self.dcc_listen()
c.ctcp(
"DCC",
nick,
f"CHAT chat {ip_quad_to_numstr(dcc.localaddress)} {dcc.localport}",
)
"""
if cmd.startswith("print"):
r = requests.get(cmd.split(" ")[1:][0], allow_redirects=True)
open(IMAGE_PATH, 'wb').write(r.content)
os.system(PRINT_COMMAND)
else:
c.notice(nick, "Not understood: " + cmd)
bot = TestBot("printer", "printerbottest", IRC_SERVER, IRC_PORT)
bot.start()

113
bot_telegram.py Normal file
View File

@ -0,0 +1,113 @@
from telethon import TelegramClient
import logging, asyncio
from telethon.tl.types import MessageMediaDocument, DocumentAttributeSticker, DocumentAttributeAnimated, MessageMediaPhoto
from telethon import events
from telethon.tl.custom import Button
from os.path import isfile, join
from grp import getgrgid
from os import getgroups
from os import system
from time import time
from PIL import Image
from config import *
from os import makedirs
logging.basicConfig(level=logging.INFO)
client = TelegramClient('bot', API_ID, API_HASH).start(bot_token=BOT_TOKEN)
client.flood_sleep_threshold = 120
print_log = {}
@client.on(events.NewMessage(pattern='^/id'))
async def debug_id(ev):
await ev.respond(f"Hello! Your id is `{ev.peer_id.user_id}` please add it to the ADMIN_ID to give youself privileges :)")
@client.on(events.NewMessage(pattern='^/start'))
async def welcome(ev):
await ev.respond(WELCOME_MSG)
if (ev.peer_id.user_id not in print_log) and PASSWORD:
await ev.respond(UNLOCK_MSG)
# This one triggers on a single message with the pin code written
@client.on(events.NewMessage(pattern=PASSWORD, func=lambda e: e.is_private))
async def unlock_printer(ev):
if ev.peer_id.user_id not in print_log:
print_log[ev.peer_id.user_id] = 0
if PASSWORD:
await ev.respond(UNLOCKED_MSG)
@client.on(events.NewMessage(incoming=True, func=lambda e: e.is_private and e.message.media))
async def handler(ev):
msg = ev.message
if ev.peer_id.user_id not in print_log:
await ev.respond(UNLOCK_MSG)
# Check if the file is valid
if msg.photo:
fn = join(CACHE_DIR, f"{msg.photo.id}.jpg")
elif msg.sticker:
fn = join(CACHE_DIR, f"{msg.sticker.id}.webp")
for att in msg.sticker.attributes:
if isinstance(att, DocumentAttributeAnimated):
fn = None
break
else:
fn = None
if not fn:
await ev.respond(FORMAT_ERR_MSG)
return
# Check if the user is still in the cooldown period
time_left = int((print_log[ev.peer_id.user_id] + BASE_COOLDOWN) - time())
if time_left > 0:
await ev.respond(RATELIMIT_MSG.format(time_left=time_left))
return
# Download the file unless it's in the cache!
if not isfile(fn):
await client.download_media(msg, file=fn)
# Try opening the image, at least
try:
img = Image.open(fn)
except:
await ev.respond(FORMAT_ERR_MSG)
return
# Limit stickers ratio (so people don't print incredibly long stickers)
if img.size[1]/img.size[0] > MAX_ASPECT_RATIO:
await ev.respond(RATIO_ERR_MSG)
return
# Remove transparency
if img.mode == 'RGBA':
bg_img = Image.new(img.mode, img.size, BACKGROUND_COLOR)
img = Image.alpha_composite(bg_img, img)
# Resize the image
img.thumbnail([WIDTH_PX, HEIGHT_PX], resample=Image.LANCZOS, reducing_gap=None)
# Convert to grayscale and apply a gamma of 1.8
img = img.convert('L')
if GAMMA_CORRECTION != 1:
img = Image.eval(img, lambda x: int(255*pow((x/255),(1/GAMMA_CORRECTION))))
img.save(IMAGE_PATH, 'PNG')
await client.forward_messages(ADMIN_ID, ev.message)
status_code = system(PRINT_COMMAND)
if status_code == 0:
print_log[ev.peer_id.user_id] = time()
await ev.respond(PRINT_SUCCESS_MSG)
else:
await ev.respond(PRINT_FAIL_MSG)
await client.send_message(ADMIN_ID, f'Printer is not working. Process returned status code {status_code}')
if PRINT_SUCCESS_COMMAND:
system(PRINT_SUCCESS_COMMAND)

View File

@ -1,3 +1,6 @@
telethon telethon
Pillow Pillow
brother_ql brother_ql
irc
requests
argparse