sed: refactor to look more like current reegee code
parent
269a2ab1ae
commit
22c9942c2d
|
@ -1,16 +1,18 @@
|
||||||
"""
|
"""
|
||||||
Become @regexbot when the bot is missing
|
Become @regexbot when the bot is missing
|
||||||
"""
|
"""
|
||||||
from collections import defaultdict, deque
|
|
||||||
import re
|
|
||||||
|
|
||||||
import regex
|
import regex as re
|
||||||
|
from collections import defaultdict, deque
|
||||||
|
|
||||||
from telethon import events, utils
|
from telethon import events, utils
|
||||||
from telethon.tl import types, functions
|
from telethon.tl import types, functions
|
||||||
|
|
||||||
from uniborg import util
|
from uniborg import util
|
||||||
|
|
||||||
HEADER = "「sed」\n"
|
SED_PATTERN = r'^s/((?:\\/|[^/])+)/((?:\\/|[^/])*)(/.*)?'
|
||||||
|
GROUP0_RE = re.compile(r'(?<!\\)((?:\\\\)*)\\0')
|
||||||
|
HEADER = '「sed」\n'
|
||||||
KNOWN_RE_BOTS = re.compile(
|
KNOWN_RE_BOTS = re.compile(
|
||||||
r'(regex|moku|ou|BananaButler_|rgx|l4mR)bot',
|
r'(regex|moku|ou|BananaButler_|rgx|l4mR)bot',
|
||||||
flags=re.IGNORECASE
|
flags=re.IGNORECASE
|
||||||
|
@ -22,13 +24,20 @@ KNOWN_RE_BOTS = re.compile(
|
||||||
last_msgs = defaultdict(lambda: deque(maxlen=10))
|
last_msgs = defaultdict(lambda: deque(maxlen=10))
|
||||||
|
|
||||||
|
|
||||||
@util.sync_timeout(1)
|
def cleanup_pattern(match):
|
||||||
def doit(chat_id, match, original):
|
from_ = match.group(1)
|
||||||
fr = match.group(1)
|
|
||||||
to = match.group(2)
|
to = match.group(2)
|
||||||
to = (to
|
|
||||||
.replace('\\/', '/')
|
to = to.replace('\\/', '/')
|
||||||
.replace('\\0', '\\g<0>'))
|
to = GROUP0_RE.sub(r'\1\\g<0>', to)
|
||||||
|
|
||||||
|
return from_, to
|
||||||
|
|
||||||
|
|
||||||
|
#@util.sync_timeout(1)
|
||||||
|
async def doit(message, match):
|
||||||
|
fr, to = cleanup_pattern(match)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
fl = match.group(3)
|
fl = match.group(3)
|
||||||
if fl is None:
|
if fl is None:
|
||||||
|
@ -40,34 +49,49 @@ def doit(chat_id, match, original):
|
||||||
# Build Python regex flags
|
# Build Python regex flags
|
||||||
count = 1
|
count = 1
|
||||||
flags = 0
|
flags = 0
|
||||||
for f in fl:
|
for f in fl.lower():
|
||||||
if f == 'i':
|
if f == 'i':
|
||||||
flags |= regex.IGNORECASE
|
flags |= re.IGNORECASE
|
||||||
|
elif f == 'm':
|
||||||
|
flags |= re.MULTILINE
|
||||||
|
elif f == 's':
|
||||||
|
flags |= re.DOTALL
|
||||||
elif f == 'g':
|
elif f == 'g':
|
||||||
count = 0
|
count = 0
|
||||||
|
elif f == 'x':
|
||||||
|
flags |= re.VERBOSE
|
||||||
else:
|
else:
|
||||||
return None, f"Unknown flag: {f}"
|
await message.reply(f'{HEADER}Unknown flag: {f}')
|
||||||
|
return
|
||||||
|
|
||||||
def actually_doit(original):
|
def substitute(m):
|
||||||
try:
|
if s := m.raw_text:
|
||||||
s = original.message
|
|
||||||
if s.startswith(HEADER):
|
if s.startswith(HEADER):
|
||||||
s = s[len(HEADER):]
|
s = s[len(HEADER):]
|
||||||
s, i = regex.subn(fr, to, s, count=count, flags=flags)
|
else:
|
||||||
if i > 0:
|
return None
|
||||||
return original, s
|
|
||||||
except Exception as e:
|
|
||||||
return None, f"u dun goofed m8: {str(e)}"
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
if original is not None:
|
s, i = re.subn(fr, to, s, count=count, flags=flags)
|
||||||
return actually_doit(original)
|
if i > 0:
|
||||||
# Try matching the last few messages
|
return s
|
||||||
for original in last_msgs[chat_id]:
|
|
||||||
m, s = actually_doit(original)
|
try:
|
||||||
if s is not None:
|
msg = None
|
||||||
return m, s
|
substitution = None
|
||||||
return None, None
|
if message.is_reply:
|
||||||
|
msg = await message.get_reply_message()
|
||||||
|
substitution = substitute(msg)
|
||||||
|
else:
|
||||||
|
for msg in reversed(last_msgs[message.chat_id]):
|
||||||
|
substitution = substitute(msg)
|
||||||
|
if substitution is not None:
|
||||||
|
break # msg is also set
|
||||||
|
|
||||||
|
if substitution is not None:
|
||||||
|
return await msg.reply(f'{HEADER}{substitution}', parse_mode=None)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
await message.reply(f'{HEADER}fuck me: {e}')
|
||||||
|
|
||||||
|
|
||||||
async def group_has_sedbot(group):
|
async def group_has_sedbot(group):
|
||||||
|
@ -80,21 +104,7 @@ async def group_has_sedbot(group):
|
||||||
|
|
||||||
return any(KNOWN_RE_BOTS.match(x.username or '') for x in full.users)
|
return any(KNOWN_RE_BOTS.match(x.username or '') for x in full.users)
|
||||||
|
|
||||||
|
async def sed(event):
|
||||||
@borg.on(events.NewMessage)
|
|
||||||
async def on_message(event):
|
|
||||||
last_msgs[event.chat_id].appendleft(event.message)
|
|
||||||
|
|
||||||
@borg.on(events.MessageEdited)
|
|
||||||
async def on_edit(event):
|
|
||||||
for m in last_msgs[event.chat_id]:
|
|
||||||
if m.id == event.id:
|
|
||||||
m.raw_text = event.raw_text
|
|
||||||
break
|
|
||||||
|
|
||||||
@borg.on(events.NewMessage(
|
|
||||||
pattern=re.compile(r"^s/((?:\\/|[^/])+)/((?:\\/|[^/])*)(/.*)?")))
|
|
||||||
async def on_regex(event):
|
|
||||||
if event.fwd_from:
|
if event.fwd_from:
|
||||||
return
|
return
|
||||||
if not event.is_private:
|
if not event.is_private:
|
||||||
|
@ -103,17 +113,25 @@ async def on_regex(event):
|
||||||
if await group_has_sedbot(await event.get_input_chat()):
|
if await group_has_sedbot(await event.get_input_chat()):
|
||||||
return
|
return
|
||||||
|
|
||||||
chat_id = utils.get_peer_id(await event.get_input_chat())
|
message = await doit(event.message, event.pattern_match)
|
||||||
|
if message:
|
||||||
m, s = doit(chat_id, event.pattern_match, await event.get_reply_message())
|
last_msgs[event.chat_id].append(message)
|
||||||
|
|
||||||
if m is not None:
|
|
||||||
s = f"{HEADER}{s}"
|
|
||||||
out = await borg.send_message(
|
|
||||||
await event.get_input_chat(), s, reply_to=m.id, parse_mode=None
|
|
||||||
)
|
|
||||||
last_msgs[chat_id].appendleft(out)
|
|
||||||
elif s is not None:
|
|
||||||
await event.reply(s)
|
|
||||||
|
|
||||||
|
# Don't save sed commands or we would be able to sed those
|
||||||
raise events.StopPropagation
|
raise events.StopPropagation
|
||||||
|
|
||||||
|
|
||||||
|
@borg.on(events.NewMessage)
|
||||||
|
async def catch_all(event):
|
||||||
|
last_msgs[event.chat_id].append(event.message)
|
||||||
|
|
||||||
|
|
||||||
|
@borg.on(events.MessageEdited)
|
||||||
|
async def catch_edit(event):
|
||||||
|
for i, message in enumerate(last_msgs[event.chat_id]):
|
||||||
|
if message.id == event.id:
|
||||||
|
last_msgs[event.chat_id][i] = event.message
|
||||||
|
|
||||||
|
|
||||||
|
borg.on(events.NewMessage(pattern=SED_PATTERN))(sed)
|
||||||
|
borg.on(events.MessageEdited(pattern=SED_PATTERN))(sed)
|
||||||
|
|
Reference in New Issue