miniirc_idc.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. #!/usr/bin/env python3
  2. #
  3. # This is very horrible and quickly written
  4. # But it works
  5. #
  6. # Copyright © 2022 by luk3yx
  7. #
  8. # Permission is hereby granted, free of charge, to any person obtaining a copy
  9. # of this software and associated documentation files (the "Software"), to deal
  10. # in the Software without restriction, including without limitation the rights
  11. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. # copies of the Software, and to permit persons to whom the Software is
  13. # furnished to do so, subject to the following conditions:
  14. #
  15. # The above copyright notice and this permission notice shall be included in
  16. # all copies or substantial portions of the Software.
  17. #
  18. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  24. # SOFTWARE.
  25. #
  26. from __future__ import annotations
  27. from collections.abc import Iterator, Mapping, Sequence
  28. from typing import Optional
  29. import datetime, miniirc, re, traceback # type: ignore
  30. assert miniirc.ver >= (1,8,1)
  31. _LEADING_COLON = '' if miniirc.ver[0] > 2 else ':'
  32. _esc_re = re.compile(r'\\(.)')
  33. # Backslash must be first
  34. _idc_escapes = {'\\': '\\', 'r': '\r', 'n': '\n', 't': '\t'}
  35. def _get_idc_args(command: str, kwargs: Mapping[str, Optional[str | float]]
  36. ) -> Iterator[str]:
  37. yield command
  38. for key, value in kwargs.items():
  39. if value is not None:
  40. value = str(value)
  41. for escape_char, char in _idc_escapes.items():
  42. value = value.replace(char, '\\' + escape_char)
  43. yield f'{key.upper()}={value}'
  44. def _parse_join(irc: IDC, hostmask: tuple[str, str, str],
  45. tags: Mapping[str, str], args: list[str]) -> None:
  46. users = tags.get('=idc-join-users')
  47. if isinstance(users, str):
  48. irc._dispatch('353', '', [irc.current_nick, '=', args[0], users])
  49. irc._dispatch('366', '', [irc.current_nick, args[0],
  50. 'End of /NAMES list'])
  51. class IDC(miniirc.IRC):
  52. if miniirc.ver[0] >= 2:
  53. def _dispatch(self, command: str, user: str, args: list[str]) -> None:
  54. self.handle_msg(miniirc.IRCMessage(
  55. command,
  56. (user, '~u', f'idc/{user}') if user else ('', '', ''),
  57. {},
  58. args,
  59. ))
  60. else:
  61. def _dispatch(self, command: str, user: str, args: list[str]) -> None:
  62. if args:
  63. args[-1] = _LEADING_COLON + args[-1]
  64. self._handle(
  65. command,
  66. (user, '~u', f'idc/{user}') if user else ('', '', ''),
  67. {},
  68. args,
  69. )
  70. def __init__(self, *args, **kwargs) -> None:
  71. super().__init__(*args, **kwargs)
  72. self.Handler('JOIN', colon=False, ircv3=True)(_parse_join)
  73. def _idc_message_parser_no_exc(
  74. self, msg: str
  75. ) -> Optional[tuple[str, tuple[str, str, str], dict[str, str], list[str]]]:
  76. try:
  77. return self.idc_message_parser(msg)
  78. except Exception:
  79. traceback.print_exc()
  80. return None
  81. def idc_message_parser(
  82. self, msg: str
  83. ) -> Optional[tuple[str, tuple[str, str, str], dict[str, str], list[str]]]:
  84. idc_cmd = None
  85. idc_args = {}
  86. for arg in msg.split('\t'):
  87. if '=' in arg:
  88. key, value = arg.split('=', 1)
  89. idc_args[key] = _esc_re.sub(
  90. lambda m: _idc_escapes.get(m.group(1), '\ufffd'),
  91. value
  92. )
  93. else:
  94. idc_cmd = arg
  95. # Translate IDC keyword arguments into IRC positional ones
  96. tags = {}
  97. if idc_cmd == 'PRIVMSG':
  98. command = 'PRIVMSG'
  99. args = [self.current_nick, idc_args['MESSAGE']]
  100. elif idc_cmd == 'CHANMSG':
  101. command = 'PRIVMSG'
  102. args = ['#' + idc_args['TARGET'], idc_args['MESSAGE']]
  103. elif idc_cmd == 'LOGIN_GOOD':
  104. command = '001'
  105. args = [self.current_nick, f'Welcome to IDC {self.current_nick}']
  106. elif idc_cmd == 'PONG':
  107. command = 'PONG'
  108. args = [self.ip, idc_args.get('COOKIE', '')]
  109. elif idc_cmd == 'JOIN':
  110. command = 'JOIN'
  111. idc_args['SOURCE'] = self.current_nick
  112. args = ['#' + idc_args['CHANNEL']]
  113. # HACK: Add a message tag and fire other events later rather than
  114. # firing events from the parser function which feels worse.
  115. # The tag name starts with = so that it doesn't conflict with any
  116. # actual IRC tags.
  117. tags['=idc-join-users'] = idc_args['USERS']
  118. else:
  119. return None
  120. # Add generic parameters
  121. if 'SOURCE' in idc_args:
  122. user = idc_args['SOURCE']
  123. hostmask = (user, '~u', f'idc/{user}')
  124. tags['account'] = user
  125. else:
  126. hostmask = ('', '', '')
  127. if command == 'PRIVMSG':
  128. # If echo-message wasn't requested then don't send self messages
  129. if (hostmask[0] == self.current_nick and
  130. 'echo-message' not in self.active_caps):
  131. return None
  132. # Parse the message type
  133. msg_type = idc_args.get('TYPE', '').upper()
  134. if msg_type == 'NOTICE':
  135. command = 'NOTICE'
  136. elif msg_type == 'ACTION':
  137. args[1] = f'\x01ACTION {args[1]}\x01'
  138. if 'TS' in idc_args:
  139. dt = datetime.datetime.utcfromtimestamp(float(idc_args['TS']))
  140. tags['time'] = dt.isoformat() + 'Z'
  141. if 'LABEL' in idc_args:
  142. tags['label'] = idc_args['LABEL']
  143. if miniirc.ver[0] >= 2:
  144. return miniirc.IRCMessage(command, hostmask, tags, args)
  145. else:
  146. if args:
  147. args[-1] = _LEADING_COLON + args[-1]
  148. return command, hostmask, tags, args
  149. # Send raw messages
  150. def idc_send(self, command: str, **kwargs: Optional[str | float]):
  151. super().quote('\t'.join(_get_idc_args(command, kwargs)), force=True)
  152. def quote(self, *msg: str, force: Optional[bool] = None,
  153. tags: Optional[Mapping[str, str | bool]] = None) -> None:
  154. cmd, _, tags2, args = miniirc.ircv3_message_parser(' '.join(msg))
  155. if miniirc.ver[0] < 2 and args and args[-1].startswith(':'):
  156. args[-1] = args[-1][1:]
  157. self.send(cmd, *args, force=force, tags=tags or tags2)
  158. def _get_idc_account(self) -> Sequence[str]:
  159. if isinstance(self.ns_identity, tuple):
  160. return self.ns_identity
  161. else:
  162. return self.ns_identity.split(' ', 1)
  163. @property
  164. def current_nick(self) -> str:
  165. return self._get_idc_account()[0]
  166. def send(self, cmd: str, *args: str, force: Optional[bool] = None,
  167. tags: Optional[Mapping[str, str | bool]] = None) -> None:
  168. cmd = cmd.upper()
  169. label = tags.get('label') if tags else None
  170. if cmd in ('PRIVMSG', 'NOTICE'):
  171. target = args[0]
  172. # TODO: Make miniirc think that SASL worked PMs to NickServ don't
  173. # have to be blocked.
  174. if target == 'NickServ':
  175. return
  176. msg = args[1]
  177. msg_type: Optional[str]
  178. if cmd == 'NOTICE':
  179. msg_type = 'NOTICE'
  180. elif msg.startswith('\x01ACTION'):
  181. msg = msg[8:].rstrip('\x01')
  182. msg_type = 'ACTION'
  183. else:
  184. msg_type = None
  185. if target.startswith('#'):
  186. idc_cmd = 'CHANMSG'
  187. target = target[1:]
  188. else:
  189. idc_cmd = 'PRIVMSG'
  190. self.idc_send(idc_cmd, target=target, type=msg_type, message=msg,
  191. label=label)
  192. elif cmd == 'PING':
  193. self.idc_send('PING', cookie=args[0], label=label)
  194. elif cmd == 'USER':
  195. user, password = self._get_idc_account()
  196. self.idc_send('LOGIN', username=user, password=password,
  197. label=label)
  198. self.active_caps = self.ircv3_caps & {
  199. 'account-tag', 'echo-message', 'labeled-response',
  200. }
  201. # Override the message parser to change the default parser.
  202. def change_parser(self, parser=None):
  203. super().change_parser(parser or self._idc_message_parser_no_exc)