xkcdtextbot/bot.py

121 lines
3.6 KiB
Python
Raw Normal View History

2019-05-11 17:21:05 +00:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
2019-05-11 18:09:19 +00:00
import asyncio
import html
2019-05-11 17:21:05 +00:00
from os import environ
from collections import namedtuple
import logging
2019-05-11 19:13:46 +00:00
import aiohttp
2019-05-11 17:52:21 +00:00
from telethon import TelegramClient, events
2019-05-11 19:24:37 +00:00
from telethon.tl.custom import Button
2019-05-11 17:21:05 +00:00
from bs4 import BeautifulSoup
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
Xkcd = namedtuple('Xkcd', ['title', 'link', 'transcript', 'alt', 'number'])
URL_FMT_STR = "http://www.ohnorobot.com/index.php?Search=Search&comic=56&s={}"
MSG_FMT_STR = '<a href="{link}">{number}</a>: <b>{title}</b>\n\n<i>{alt}</i>'
2019-05-11 17:21:05 +00:00
XKCD_JSON_FMT_STR = "https://xkcd.com/{}/info.0.json"
MAX_SEARCH_RESULTS = 10
2019-05-11 17:52:21 +00:00
bot = TelegramClient('xkcd', 6, 'eb06d4abfb49dc3eeb1aeb98ae0f581e')
bot.parse_mode = 'html'
2019-05-11 19:13:46 +00:00
session = None # set later
2019-05-11 19:24:37 +00:00
me = None # set later
2019-05-11 17:52:21 +00:00
2019-05-11 17:21:05 +00:00
# blockquote element -> Xkcd
2019-05-11 19:13:46 +00:00
async def parse_blockquote(elem):
2019-05-11 17:21:05 +00:00
children = list(elem.children)
title = children[0].text
link = 'https' + children[-1].text[4:]
number = link.rsplit('/', 2)[1]
2019-05-11 19:13:46 +00:00
async with session.get(XKCD_JSON_FMT_STR.format(number)) as resp:
info = await resp.json()
2019-05-11 17:21:05 +00:00
alt = info['alt']
# TODO markdown bold the <span> matches
text = ''.join(
[e.text if hasattr(e, 'text') else e
for
e in children[1:-1]]
)
return Xkcd(title, link, text, alt, number)
# string -> [Xkcd]
2019-05-11 19:13:46 +00:00
async def get_xkcds(text):
2019-05-11 17:21:05 +00:00
logger.info("getting %s", text)
if text == '':
return []
# TODO return newest when empty
2019-05-11 19:13:46 +00:00
async with session.get(URL_FMT_STR.format(text)) as resp:
soup = BeautifulSoup(await resp.text(), "html.parser")
2019-05-11 17:21:05 +00:00
bqs = soup.find_all("blockquote")[:MAX_SEARCH_RESULTS]
logger.info(bqs)
2019-05-11 19:13:46 +00:00
return await asyncio.gather(*(parse_blockquote(e) for e in bqs))
2019-05-11 17:21:05 +00:00
# Define a few command handlers. These usually take the two arguments bot and
# update. Error handlers also receive the raised TelegramError object in error.
2019-05-11 17:52:21 +00:00
@bot.on(events.NewMessage(pattern='/start$'))
async def start(event):
2019-05-11 17:21:05 +00:00
"""Send a message when the command /start is issued."""
2019-05-11 19:24:37 +00:00
await event.respond(
f"Hello! I'm {me.username} and I search for XKCD when used inline.",
buttons=Button.switch_inline('Try it!', 'cheaply')
)
2019-05-11 17:21:05 +00:00
2019-05-11 17:52:21 +00:00
@bot.on(events.NewMessage(pattern='/help$'))
async def help(event):
2019-05-11 17:21:05 +00:00
"""Send a message when the command /help is issued."""
2019-05-11 19:24:37 +00:00
await event.respond(
f"I only work inline, and it is my job to search for XKCD comics!",
buttons=Button.switch_inline('Try it!', 'cheaply')
)
2019-05-11 17:52:21 +00:00
2019-05-11 17:21:05 +00:00
2019-05-11 17:52:21 +00:00
@bot.on(events.InlineQuery)
async def inlinequery(event):
2019-05-11 17:21:05 +00:00
"""Handle the inline query."""
# TODO show transcript in result but not message?
2019-05-11 17:52:21 +00:00
builder = event.builder
2019-05-11 18:09:19 +00:00
result = await asyncio.gather(*(builder.article(
2019-05-11 17:52:21 +00:00
title=xkcd.title,
url=xkcd.link,
text=MSG_FMT_STR.format(
number=xkcd.number,
link=xkcd.link,
title=html.escape(xkcd.title),
alt=html.escape(xkcd.alt)
)
2019-05-11 19:13:46 +00:00
) for xkcd in await get_xkcds(event.text)))
2019-05-11 18:09:19 +00:00
# FIXME get_xkcds returns duplicates, which lead to the same result ID
# Build a dict by their ID to remove the duplicates
result = list({r.id: r for r in result}.values())
await event.answer(result)
2019-05-11 17:21:05 +00:00
2019-05-11 19:13:46 +00:00
async def main():
2019-05-11 19:24:37 +00:00
global session, me
2019-05-11 19:13:46 +00:00
async with aiohttp.ClientSession() as session:
await bot.start(bot_token=environ['TOKEN'])
async with bot:
2019-05-11 19:24:37 +00:00
me = await bot.get_me()
2019-05-11 19:13:46 +00:00
await bot.run_until_disconnected()
2019-05-11 17:21:05 +00:00
if __name__ == '__main__':
2019-05-11 19:13:46 +00:00
asyncio.get_event_loop().run_until_complete(main())