123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- #!/usr/bin/env python3
- #
- # This is very horrible and quickly written
- # But it works
- #
- # Copyright © 2022 by luk3yx
- #
- # Permission is hereby granted, free of charge, to any person obtaining a copy
- # of this software and associated documentation files (the "Software"), to deal
- # in the Software without restriction, including without limitation the rights
- # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- # copies of the Software, and to permit persons to whom the Software is
- # furnished to do so, subject to the following conditions:
- #
- # The above copyright notice and this permission notice shall be included in
- # all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- # SOFTWARE.
- #
- from __future__ import annotations
- from collections.abc import Iterator, Mapping, Sequence
- from typing import Optional
- import datetime, miniirc, re, traceback # type: ignore
- assert miniirc.ver >= (1,8,1)
- _LEADING_COLON = '' if miniirc.ver[0] > 2 else ':'
- _esc_re = re.compile(r'\\(.)')
- # Backslash must be first
- _idc_escapes = {'\\': '\\', 'r': '\r', 'n': '\n', 't': '\t'}
- def _get_idc_args(command: str, kwargs: Mapping[str, Optional[str | float]]
- ) -> Iterator[str]:
- yield command
- for key, value in kwargs.items():
- if value is not None:
- value = str(value)
- for escape_char, char in _idc_escapes.items():
- value = value.replace(char, '\\' + escape_char)
- yield f'{key.upper()}={value}'
- def _parse_join(irc: IDC, hostmask: tuple[str, str, str],
- tags: Mapping[str, str], args: list[str]) -> None:
- users = tags.get('=idc-join-users')
- if isinstance(users, str):
- irc._dispatch('353', '', [irc.current_nick, '=', args[0], users])
- irc._dispatch('366', '', [irc.current_nick, args[0],
- 'End of /NAMES list'])
- class IDC(miniirc.IRC):
- if miniirc.ver[0] >= 2:
- def _dispatch(self, command: str, user: str, args: list[str]) -> None:
- self.handle_msg(miniirc.IRCMessage(
- command,
- (user, '~u', f'idc/{user}') if user else ('', '', ''),
- {},
- args,
- ))
- else:
- def _dispatch(self, command: str, user: str, args: list[str]) -> None:
- if args:
- args[-1] = _LEADING_COLON + args[-1]
- self._handle(
- command,
- (user, '~u', f'idc/{user}') if user else ('', '', ''),
- {},
- args,
- )
- def __init__(self, *args, **kwargs) -> None:
- super().__init__(*args, **kwargs)
- self.Handler('JOIN', colon=False, ircv3=True)(_parse_join)
- def _idc_message_parser_no_exc(
- self, msg: str
- ) -> Optional[tuple[str, tuple[str, str, str], dict[str, str], list[str]]]:
- try:
- return self.idc_message_parser(msg)
- except Exception:
- traceback.print_exc()
- return None
- def idc_message_parser(
- self, msg: str
- ) -> Optional[tuple[str, tuple[str, str, str], dict[str, str], list[str]]]:
- idc_cmd = None
- idc_args = {}
- for arg in msg.split('\t'):
- if '=' in arg:
- key, value = arg.split('=', 1)
- idc_args[key] = _esc_re.sub(
- lambda m: _idc_escapes.get(m.group(1), '\ufffd'),
- value
- )
- else:
- idc_cmd = arg
- # Translate IDC keyword arguments into IRC positional ones
- tags = {}
- if idc_cmd == 'PRIVMSG':
- command = 'PRIVMSG'
- args = [self.current_nick, idc_args['MESSAGE']]
- elif idc_cmd == 'CHANMSG':
- command = 'PRIVMSG'
- args = ['#' + idc_args['TARGET'], idc_args['MESSAGE']]
- elif idc_cmd == 'LOGIN_GOOD':
- command = '001'
- args = [self.current_nick, f'Welcome to IDC {self.current_nick}']
- elif idc_cmd == 'PONG':
- command = 'PONG'
- args = [self.ip, idc_args.get('COOKIE', '')]
- elif idc_cmd == 'JOIN':
- command = 'JOIN'
- idc_args['SOURCE'] = self.current_nick
- args = ['#' + idc_args['CHANNEL']]
- # HACK: Add a message tag and fire other events later rather than
- # firing events from the parser function which feels worse.
- # The tag name starts with = so that it doesn't conflict with any
- # actual IRC tags.
- tags['=idc-join-users'] = idc_args['USERS']
- else:
- return None
- # Add generic parameters
- if 'SOURCE' in idc_args:
- user = idc_args['SOURCE']
- hostmask = (user, '~u', f'idc/{user}')
- tags['account'] = user
- else:
- hostmask = ('', '', '')
- if command == 'PRIVMSG':
- # If echo-message wasn't requested then don't send self messages
- if (hostmask[0] == self.current_nick and
- 'echo-message' not in self.active_caps):
- return None
- # Parse the message type
- msg_type = idc_args.get('TYPE', '').upper()
- if msg_type == 'NOTICE':
- command = 'NOTICE'
- elif msg_type == 'ACTION':
- args[1] = f'\x01ACTION {args[1]}\x01'
- if 'TS' in idc_args:
- dt = datetime.datetime.utcfromtimestamp(float(idc_args['TS']))
- tags['time'] = dt.isoformat() + 'Z'
- if 'LABEL' in idc_args:
- tags['label'] = idc_args['LABEL']
- if miniirc.ver[0] >= 2:
- return miniirc.IRCMessage(command, hostmask, tags, args)
- else:
- if args:
- args[-1] = _LEADING_COLON + args[-1]
- return command, hostmask, tags, args
- # Send raw messages
- def idc_send(self, command: str, **kwargs: Optional[str | float]):
- super().quote('\t'.join(_get_idc_args(command, kwargs)), force=True)
- def quote(self, *msg: str, force: Optional[bool] = None,
- tags: Optional[Mapping[str, str | bool]] = None) -> None:
- cmd, _, tags2, args = miniirc.ircv3_message_parser(' '.join(msg))
- if miniirc.ver[0] < 2 and args and args[-1].startswith(':'):
- args[-1] = args[-1][1:]
- self.send(cmd, *args, force=force, tags=tags or tags2)
- def _get_idc_account(self) -> Sequence[str]:
- if isinstance(self.ns_identity, tuple):
- return self.ns_identity
- else:
- return self.ns_identity.split(' ', 1)
- @property
- def current_nick(self) -> str:
- return self._get_idc_account()[0]
- def send(self, cmd: str, *args: str, force: Optional[bool] = None,
- tags: Optional[Mapping[str, str | bool]] = None) -> None:
- cmd = cmd.upper()
- label = tags.get('label') if tags else None
- if cmd in ('PRIVMSG', 'NOTICE'):
- target = args[0]
- # TODO: Make miniirc think that SASL worked PMs to NickServ don't
- # have to be blocked.
- if target == 'NickServ':
- return
- msg = args[1]
- msg_type: Optional[str]
- if cmd == 'NOTICE':
- msg_type = 'NOTICE'
- elif msg.startswith('\x01ACTION'):
- msg = msg[8:].rstrip('\x01')
- msg_type = 'ACTION'
- else:
- msg_type = None
- if target.startswith('#'):
- idc_cmd = 'CHANMSG'
- target = target[1:]
- else:
- idc_cmd = 'PRIVMSG'
- self.idc_send(idc_cmd, target=target, type=msg_type, message=msg,
- label=label)
- elif cmd == 'PING':
- self.idc_send('PING', cookie=args[0], label=label)
- elif cmd == 'USER':
- user, password = self._get_idc_account()
- self.idc_send('LOGIN', username=user, password=password,
- label=label)
- self.active_caps = self.ircv3_caps & {
- 'account-tag', 'echo-message', 'labeled-response',
- }
- # Override the message parser to change the default parser.
- def change_parser(self, parser=None):
- super().change_parser(parser or self._idc_message_parser_no_exc)
|