#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio from os import environ from collections import namedtuple from json import loads import logging import requests from telethon import TelegramClient, events from bs4 import BeautifulSoup # Enable logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) Xkcd = namedtuple('Xkcd', ['title', 'link', 'transcript', 'alt', 'number']) URL_FMT_STR = "http://www.ohnorobot.com/index.php?Search=Search&comic=56&s={}" # FIXME HTML parsemode + escaping MSG_FMT_STR = "[{number}]({link}): **{title}**\n\n__{alt}__" XKCD_JSON_FMT_STR = "https://xkcd.com/{}/info.0.json" MAX_SEARCH_RESULTS = 10 bot = TelegramClient('xkcd', 6, 'eb06d4abfb49dc3eeb1aeb98ae0f581e') # blockquote element -> Xkcd def parse_blockquote(elem): children = list(elem.children) title = children[0].text link = 'https' + children[-1].text[4:] number = link.rsplit('/', 2)[1] info = loads(requests.get(XKCD_JSON_FMT_STR.format(number)).text) alt = info['alt'] # TODO markdown bold the matches text = ''.join( [e.text if hasattr(e, 'text') else e for e in children[1:-1]] ) return Xkcd(title, link, text, alt, number) # string -> [Xkcd] def get_xkcds(text): logger.info("getting %s", text) if text == '': return [] # TODO return newest when empty soup = BeautifulSoup(requests.get(URL_FMT_STR.format(text)).text, "html.parser") bqs = soup.find_all("blockquote")[:MAX_SEARCH_RESULTS] logger.info(bqs) return (parse_blockquote(e) for e in bqs) # Define a few command handlers. These usually take the two arguments bot and # update. Error handlers also receive the raised TelegramError object in error. @bot.on(events.NewMessage(pattern='/start$')) async def start(event): """Send a message when the command /start is issued.""" # TODO await event.respond('Hi!') @bot.on(events.NewMessage(pattern='/help$')) async def help(event): """Send a message when the command /help is issued.""" # TODO await event.respond('Help!') @bot.on(events.InlineQuery) async def inlinequery(event): """Handle the inline query.""" # TODO show transcript in result but not message? builder = event.builder result = await asyncio.gather(*(builder.article( title=xkcd.title, url=xkcd.link, text=MSG_FMT_STR.format(number=xkcd.number, link=xkcd.link, title=xkcd.title, alt=xkcd.alt) ) for xkcd in get_xkcds(event.text))) # FIXME get_xkcds returns duplicates, which lead to the same result ID # Build a dict by their ID to remove the duplicates result = list({r.id: r for r in result}.values()) await event.answer(result) def main(): bot.start(bot_token=environ['TOKEN']) with bot: bot.run_until_disconnected() if __name__ == '__main__': main()