uniborg/stdplugins/markdown.py

158 lines
4.6 KiB
Python
Raw Normal View History

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import re
from functools import partial
from telethon import events
from telethon.tl.functions.messages import EditMessageRequest
from telethon.extensions.markdown import DEFAULT_URL_RE
from telethon.utils import add_surrogate, del_surrogate
from telethon.tl.types import (
MessageEntityBold, MessageEntityItalic, MessageEntityCode,
MessageEntityPre, MessageEntityTextUrl
)
def parse_url_match(m):
entity = MessageEntityTextUrl(
offset=m.start(),
length=len(m.group(1)),
url=del_surrogate(m.group(2))
)
return m.group(1), entity
def get_tag_parser(tag, entity):
# TODO unescape escaped tags?
def tag_parser(m):
return m.group(1), entity(offset=m.start(), length=len(m.group(1)))
tag = re.escape(tag)
return re.compile(tag + r'(.+?)' + tag, re.DOTALL), tag_parser
PRINTABLE_ASCII = range(0x21, 0x7f)
def parse_aesthetics(m):
def aesthetify(string):
for c in string:
c = ord(c)
if c in PRINTABLE_ASCII:
c += 0xFF00 - 0x20
elif c == ord(" "):
c = 0x3000
yield chr(c)
return "".join(aesthetify(m[1])), None
2018-12-19 14:52:09 +00:00
def parse_strikethrough(m):
return ("\u0336".join(m[1]) + "\u0336"), None
2018-05-08 21:53:00 +00:00
def parse_subreddit(m):
text = '/' + m.group(3)
entity = MessageEntityTextUrl(
offset=m.start(2),
length=len(text),
url=f'reddit.com{text}'
)
return m.group(1) + text, entity
2018-05-08 22:50:20 +00:00
def parse_snip(m):
try:
name = m.group(1)[1:]
snip = borg._plugins['snip'].storage.snips[name]
if snip['type'] == borg._plugins['snip'].TYPE_TEXT:
return snip['text'], None
except KeyError:
pass
return m.group(1), None
PARSED_ENTITIES = (
MessageEntityBold, MessageEntityItalic, MessageEntityCode,
MessageEntityPre, MessageEntityTextUrl
)
2018-06-11 11:26:12 +00:00
# A matcher is a tuple of (regex pattern, parse function)
2018-05-08 23:46:21 +00:00
# where the parse function takes the match and returns (text, entity)
MATCHERS = [
(DEFAULT_URL_RE, parse_url_match),
(get_tag_parser('**', MessageEntityBold)),
(get_tag_parser('__', MessageEntityItalic)),
(get_tag_parser('```', partial(MessageEntityPre, language=''))),
2018-05-08 21:53:00 +00:00
(get_tag_parser('`', MessageEntityCode)),
(re.compile(r'\+\+(.+?)\+\+'), parse_aesthetics),
2018-12-19 14:52:09 +00:00
(re.compile(r'~~(.+?)~~'), parse_strikethrough),
2018-05-08 22:50:20 +00:00
(re.compile(r'([^/\w]|^)(/?(r/\w+))'), parse_subreddit),
(re.compile(r'(!\w+)'), parse_snip)
]
2018-06-11 11:26:12 +00:00
def parse(message, old_entities=None):
entities = []
2018-06-11 11:26:12 +00:00
old_entities = sorted(old_entities or [], key=lambda e: e.offset)
i = 0
2018-06-11 11:26:12 +00:00
after = 0
message = add_surrogate(message)
while i < len(message):
2018-06-11 11:26:12 +00:00
for after, e in enumerate(old_entities[after:], start=after):
# If the next entity is strictly to our right, we're done here
if i < e.offset:
break
# Skip already existing entities if we're at one
if i == e.offset:
i += e.length
# Find the first pattern that matches
for pattern, parser in MATCHERS:
match = pattern.match(message, pos=i)
if match:
break
2018-05-08 21:56:44 +00:00
else:
i += 1
continue
2018-05-08 21:56:44 +00:00
text, entity = parser(match)
2018-06-11 11:26:12 +00:00
# Shift old entities after our current position (so they stay in place)
shift = len(text) - len(match[0])
if shift:
for e in old_entities[after:]:
e.offset += shift
# Replace whole match with text from parser
2018-05-08 21:56:44 +00:00
message = ''.join((
message[:match.start()],
text,
message[match.end():]
))
2018-06-11 11:26:12 +00:00
# Append entity if we got one
2018-05-08 21:56:44 +00:00
if entity:
entities.append(entity)
2018-06-11 11:26:12 +00:00
# Skip past the match
2018-05-08 21:56:44 +00:00
i += len(text)
return del_surrogate(message), entities + old_entities
@borg.on(events.MessageEdited(outgoing=True))
@borg.on(events.NewMessage(outgoing=True))
async def reparse(event):
2018-06-11 11:26:12 +00:00
old_entities = event.message.entities or []
parser = partial(parse, old_entities=old_entities)
message, msg_entities = await borg._parse_message_text(event.raw_text, parser)
if len(old_entities) >= len(msg_entities) and event.raw_text == message:
return
await borg(EditMessageRequest(
2018-06-22 08:46:39 +00:00
peer=await event.get_input_chat(),
id=event.message.id,
message=message,
no_webpage=not bool(event.message.media),
entities=msg_entities
))
raise events.StopPropagation