123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878 |
- # poibot.py: Protocol of IRC bot, an experimental IRC bot
- # Copyright (C) 2015, 2016, 2017 Tom Li
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- import time
- import re
- import socket
- import ssl
- import queue
- import threading
- import copy
- import rpweibo
- import random
- import json
- import const
- try:
- NICKNAME = const.NICKNAME
- except AttributeError:
- NICKNAME = "poibot"
- def restart_program(reason="something happened"):
- import sys
- import os
- import time
- sys.stderr.write(reason)
- sys.stderr.write(", restarting...\n")
- python = sys.executable
- time.sleep(1)
- os.execl(python, python, *sys.argv)
- class IRCMessage():
- def __init__(self, line=None):
- self.nick = None
- self.ident = None
- self.prefix = None
- self.command = None
- self.dest = None
- self.text = None
- self.params = []
- self._line = line
- if line:
- self._parse(line)
- def __str__(self):
- return self._line
- @staticmethod
- def is_nospcrlfcl(char):
- for i in char:
- if i in ("\0", "\r", "\n", " ", ":"):
- return False
- return True
- def _parse(self, line):
- assert line[-2], line[-1] == "\r\n"
- # parse prefix
- if line.startswith(":"):
- for idx, chr in enumerate(line):
- if chr == " ":
- break
- self.prefix = line[1:idx]
- line = line[idx:]
- # split prefix to nick and ident
- if "!" in self.prefix:
- self.nick, self.ident = self.prefix.split("!", 1)
- else:
- self.ident = self.prefix
- # eat space
- assert line[0] == " "
- line = line[1:]
- # parse command
- for idx, chr in enumerate(line):
- if not (chr.isalpha() or chr.isdigit()):
- break
- self.command = line[0:idx]
- assert (len(self.command) >= 1 and self.command.isalpha() or
- len(self.command) == 3 and self.command.isdigit())
- line = line[idx:]
- # no params
- if line == "\r\n":
- return
- # parse params
- assert line.startswith(" ")
- iters = 0
- end = False
- while iters < 14 and not end:
- if not line.startswith(" "):
- break
- else:
- line = line[1:]
- for idx, chr in enumerate(line):
- if idx == 0 and not self.is_nospcrlfcl(chr):
- end = True
- break
- elif not self.is_nospcrlfcl(chr) and chr != ":":
- self.params.append(line[0:idx])
- line = line[idx:]
- break
- iters += 1
- line = line[idx:]
- if line == "\r\n":
- return
- if iters != 13:
- line = line[1:]
- for idx, chr in enumerate(line):
- if chr in ["\0", "\r", "\n"]:
- break
- self.params.append(line[0:idx])
- if self.command == "PRIVMSG":
- self.dest = self.params[0]
- self.text = self.params[1]
- def say(self, to, msg):
- self._line = "PRIVMSG %s :%s\r\n" % (to, msg)
- print(self._line)
- return self._line
- def me(self, to, msg):
- self.say(to, '\x01ACTION %s\x01' % msg)
- def setnick(self, nick):
- self._line = "NICK %s\r\n" % nick
- return self._line
- def setuser(self, user, realname):
- self._line = "USER %s %s %s :%s\r\n" % (user, user, user, realname)
- return self._line
- def join(self, channel):
- self._line = "JOIN %s\r\n" % channel
- return self._line
- def whois(self, user):
- self._line = "WHOIS %s\r\n" % (user)
- return self._line
- def kick(self, channel, user, reason=""):
- if reason:
- self._line = "KICK %s %s %s\r\n" % (channel, user, reason)
- else:
- self._line = "KICK %s %s\r\n" % (channel, user)
- return self._line
- def raw(self, msg):
- return "%s\r\n" % msg
- @staticmethod
- def normalize_bot_message(msg):
- msg = copy.deepcopy(msg)
- if not msg.text:
- return msg
- if msg.nick in ["Orizon", "fakeOrizon", "xmppbot", "teleboto", "OrzIrc2P"]:
- try:
- if msg.text[0] == "\x03":
- text = msg.text.lstrip("\x03")
- text = text.replace("] \x0f", "] ")
- try:
- int(text[0])
- text = text[1:]
- int(text[0])
- text = text[1:]
- except ValueError:
- pass
- msg.text = text
- text = msg.text.lstrip("[")
- stripped = text.split("]")
- msg.nick = stripped[-2]
- msg.text = stripped[-1].lstrip()
- except IndexError:
- pass
- if msg.nick in ["toxsync"]:
- try:
- text = msg.text.lstrip("(")
- stripped = text.split(")")
- msg.nick = stripped[-2]
- msg.text = stripped[-1].lstrip()
- except IndexError:
- pass
- if msg.nick in ["OrzGTalk", "blugbot", "OrzXMPP"]:
- if msg.nick == "OrzGTalk":
- prefix = "(GTalk) "
- elif msg.nick in ["blugbot", "OrzXMPP"]:
- prefix = "(XMPP) "
- try:
- text = msg.text.replace(prefix, "")
- text = text.split(": ", maxsplit=1)
- msg.nick = text[0]
- if (not msg.nick) or (len(text) != 2):
- raise IndexError
- msg.text = text[1]
- except IndexError:
- pass
- return msg
- class IRCAbstractHook():
- def __init__(self):
- self._queue = None
- def set_queue(self, queue):
- self._queue = queue
- def send(self, msg):
- self._queue.put(msg)
- def handle(self, msg):
- raise NotImplementedError
- class IRCConnection():
- class IRCInitHook(IRCAbstractHook):
- def __init__(self, sasl_external=False):
- super().__init__()
- self.sasl_external = sasl_external
- def handle(self, msg):
- if msg.command and self.sasl_external:
- self.send(IRCMessage().raw('CAP REQ :sasl'))
- self.send(IRCMessage().raw('AUTHENTICATE EXTERNAL'))
- self.send(IRCMessage().raw('AUTHENTICATE +'))
- self.send(IRCMessage().raw('CAP END'))
- if msg.command == "NOTICE":
- self.send(IRCMessage().setnick(NICKNAME))
- self.send(IRCMessage().setuser(NICKNAME, NICKNAME))
- for chan in const.CHANNELS:
- self.send(IRCMessage().join(chan))
- class IRCDebugHook(IRCAbstractHook):
- def __init__(self):
- super().__init__()
- def handle(self, msg):
- print(msg.prefix, msg.command, msg.params)
- class IRCPingHook(IRCAbstractHook):
- def __init__(self):
- super().__init__()
- def handle(self, msg):
- if msg.command == "PING":
- self.send("PONG :%s\r\n" % msg.params[0])
- def __init__(self, addr, port, client_cert=None):
- self.addr = addr
- self.port = port
- ctx = ssl.create_default_context()
- if client_cert:
- ctx.load_cert_chain(client_cert)
- ctx.options &= ssl.PROTOCOL_TLSv1_2
- ctx.options &= ssl.OP_NO_SSLv2
- ctx.options &= ssl.OP_NO_SSLv3
- ctx.options &= ssl.OP_NO_TLSv1
- ctx.options &= ssl.OP_NO_TLSv1_1
- ctx.options &= ssl.OP_NO_COMPRESSION # no CRIME attack
- ctx.options &= ssl.CERT_REQUIRED
- # XXX: failed to pass any checks... DO NOT CHECK REVOKED CERTIFICATE FOR NOW
- # ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
- ctx.set_ciphers(
- "ECDHE-RSA-AES256-GCM-SHA384:"
- "ECDHE-RSA-AES128-GCM-SHA256:"
- "DHE-RSA-AES256-GCM-SHA384:"
- "DHE-RSA-AES128-GCM-SHA256"
- )
- ctx.check_hostname = True
- if addr.endswith(".onion"):
- ctx.check_hostname = False
- ctx.load_default_certs()
- ctx.set_default_verify_paths()
- for res in socket.getaddrinfo(self.addr, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
- af, socktype, proto, canonname, sa = res
- try:
- self.sock = ctx.wrap_socket(socket.socket(af, socktype, proto), server_hostname=addr)
- self.sock.settimeout(10)
- self.sock.connect(sa)
- except socket.error:
- self.sock.close()
- self.sock = None
- continue
- break
- if self.sock is None:
- restart_program("sock.connect() failed!")
- self.sock.settimeout(None)
- import pprint
- print("TLS Ceritificate:")
- pprint.pprint(self.sock.getpeercert())
- if hasattr(self.sock, "version"):
- print("TLS Version:", self.sock.version())
- print("TLS Cipher:", self.sock.cipher()[0])
- self._sendq = queue.Queue()
- self._readq = queue.Queue()
- self._hooks = []
- _reader = threading.Thread(group=None, target=self._read)
- _reader.start()
- reader = threading.Thread(group=None, target=self.read)
- reader.start()
- sender = threading.Thread(group=None, target=self.send)
- sender.start()
- self.register_hook(self.IRCInitHook(sasl_external=client_cert), oneshot=True)
- self.register_hook(self.IRCPingHook())
- self.register_hook(self.IRCDebugHook())
- def _read(self):
- msg = []
- status = 0
- while True:
- try:
- buf = self.sock.recv()
- if not buf:
- self.sock.close()
- restart_program("TCP error")
- except OSError:
- # timeout?
- self.sock.close()
- restart_program("TCP error (OSError)")
- for i in buf:
- if status == 0:
- if i == 13:
- status = 1
- else:
- status = 0
- elif status == 1:
- if i == 10:
- status = 2
- else:
- status = 0
- elif status == 2:
- status = 0
- if status == 0:
- msg.append(i)
- elif status == 1:
- pass
- elif status == 2:
- msg.append(13)
- msg.append(10)
- bytestring = bytes(msg)
- string = bytestring.decode("UTF-8", errors="ignore")
- self._readq.put(string)
- msg.clear()
- def register_hook(self, hook, oneshot=False):
- hook.set_queue(self._sendq)
- self._hooks.append((hook, oneshot))
- def read(self):
- while True:
- data = IRCMessage(self._readq.get())
- if data.command == "ERROR":
- self.sock.close()
- restart_program("IRC error")
- for hook, oneshot in self._hooks:
- worker = threading.Thread(group=None, target=hook.handle, args=(data,))
- worker.start()
- if oneshot:
- self._hooks.remove((hook, oneshot))
- def send(self):
- while True:
- data = self._sendq.get()
- try:
- self.sock.sendall(str(data).encode("UTF-8", errors="ignore"))
- except OSError:
- self.sock.close()
- restart_program("TCP Error (Send, OSError)")
- class IRCBuiltinHook(IRCAbstractHook):
- def __init__(self):
- super().__init__()
- self._prev_msgs = {}
- def handle(self, msg):
- msg = msg.normalize_bot_message(msg)
- if msg.command == "PRIVMSG":
- self.handle_privmsg(msg)
- self.handle_poi(msg)
- elif msg.command == "NICK":
- self.handle_nick(msg)
- elif msg.command in ["PART", "QUIT"]:
- self.handle_quit(msg)
- def handle_poi(self, msg):
- if msg.text.startswith("%s: " % NICKNAME) or msg.text.startswith("%s, " % NICKNAME):
- command = msg.text.lstrip("%s" % NICKNAME)
- if "poi" in command:
- self.send(IRCMessage().say(msg.dest, "%s: poi~" % msg.nick))
- elif "help" in command:
- self.send(IRCMessage().say(msg.dest, "%s: Hello, I'm a Protocol of IRC bot, a.k.a poibot." % msg.nick))
- elif "say" in command:
- self.send(IRCMessage().say(msg.dest, "%s: 少说话,多 poi~" % msg.nick))
- def handle_privmsg(self, msg):
- text = msg.text.lstrip().rstrip()
- status = 0
- for idx, chr in enumerate(text):
- if idx == 0 and chr == 's':
- status = 1
- elif idx == 1 and chr == "/":
- status = 2
- elif status == 2 and chr == "/":
- status = 3
- elif status == 3 and chr == "/":
- status = 4
- if status == 4 and chr == "g":
- status = 5
- if status < 4:
- self.add_to_dict(msg)
- return
- else:
- text = text.lstrip("s/")
- if status == 5:
- text = text.rstrip("/g")
- elif status == 4:
- text = text.rstrip("/")
- sed_replacement = text.split("/")
- print(sed_replacement, self._prev_msgs)
- if len(sed_replacement) in [1, 2]:
- try:
- if len(sed_replacement) == 1:
- newtext = self._prev_msgs[msg.nick].replace(sed_replacement[0], "")
- else:
- newtext = self._prev_msgs[msg.nick].replace(sed_replacement[0], sed_replacement[1])
- if newtext == self._prev_msgs[msg.nick]:
- return
- except KeyError:
- return
- self.send(IRCMessage().say(msg.dest, "%s meant to say: %s" % (msg.nick, newtext)))
- def handle_nick(self, msg):
- nick_orig = msg.nick
- nick_new = msg.params[0]
- assert nick_orig != nick_new
- try:
- self._prev_msgs[nick_new] = self._prev_msgs.pop(nick_orig)
- except KeyError:
- pass
- def handle_quit(self, msg):
- try:
- del self._prev_msgs[msg.nick]
- except KeyError:
- pass
- def add_to_dict(self, msg):
- self._prev_msgs[msg.nick] = msg.text
- class IRCWhoisHook(IRCAbstractHook):
- NEED_QUERY = 0
- NOT_SECURE = 1
- SECURE = 2
- REMINDER = "%s: 欢迎加入聊天。但您没有使用 SSL/TLS 加密连接。为了大家的网络安全,强烈推荐您加密连接到 IRC,请阅读教程 https://orz.chat/tls.html"
- WARNING = "%s: 使用安全连接是此频道的方针政策。若您依然拒绝安全,阁下将会遭到封禁,剩余机会: %d 次"
- SUCCESS = "%s: 恭喜,您已成功启用安全连接。水表有保障,就用 TLS!Make Big Brother's Life Harder Again!"
- BAN = "%s: 由于您多次拒绝使用安全连接,我们只得暂时拉黑您。使用安全连接即可重新加入聊天。"
- WHOIS_USER_PATH = "./whois_users.db"
- def __init__(self):
- super().__init__()
- self.users = {}
- self._force_channel_tls = []
- try:
- self._path = const.WHOIS_USER_PATH
- except AttributeError:
- self._path = self.WHOIS_USER_PATH
- try:
- self._force_channel_tls = const.FORCE_CHANNEL_TLS
- except AttributeError:
- pass
- self.load_database()
- def load_database(self):
- try:
- with open(self._path, "r") as f:
- self.users = json.loads(f.read())
- except FileNotFoundError:
- pass
- def save_database(self):
- db = json.dumps(self.users, sort_keys=True, indent=4, separators=(',', ': '))
- with open(self._path, "w") as f:
- f.write(db)
- @staticmethod
- def new_user_attributes():
- return {"channels": [], "security": None, "violations": 0}
- def handle(self, msg):
- if msg.nick in [NICKNAME, "ChanServ"]:
- return
- if msg.command == "JOIN" and msg.params[0] in const.CHANNELS:
- try:
- user = self.users[msg.nick]
- except KeyError:
- user = self.new_user_attributes()
- self.users[msg.nick] = user
- user["security"] = self.NEED_QUERY
- user["channels"].append(msg.params[0])
- user["channels"] = list(set(user["channels"]))
- self.send(IRCMessage().whois(msg.nick))
- elif msg.command == "PART":
- try:
- self.users[msg.nick]["channels"].remove(msg.params[0])
- except KeyError:
- pass
- elif msg.command == "QUIT":
- try:
- self.users[msg.nick]["channels"].clear()
- if msg.params[0] == "Changing host":
- self.users[msg.nick]["violations"] -= 1
- except KeyError:
- pass
- elif msg.command == "401":
- # No such nick
- self.users.pop(msg.nick, None)
- elif msg.command == "311":
- # connection nicknames recieved, mark NOT_SECURE at first
- self.users[msg.params[1]]["security"] = self.NOT_SECURE
- elif msg.command == "671":
- # is using a secure connection
- self.users[msg.params[1]]["security"] = self.SECURE
- elif msg.command == "318":
- # End of /WHOIS list.
- user = self.users.get(msg.params[1], None)
- if not user:
- return
- if user["security"] == self.NOT_SECURE:
- user["violations"] += 1
- if self.judge(msg.params[1], user):
- return
- for chan in user["channels"]:
- self.send(IRCMessage().say(chan, self.REMINDER % msg.params[1]))
- if chan in self._force_channel_tls:
- self.send(IRCMessage().say(chan, self.WARNING % (msg.params[1], 3 - user["violations"])))
- elif user["security"] == self.SECURE and user["violations"] > 0:
- user["violations"] = 0
- for chan in user["channels"]:
- self.send(IRCMessage().say(chan, self.SUCCESS % msg.params[1]))
- self.users.pop(msg.params[1])
- else:
- self.users.pop(msg.params[1])
- self.save_database()
- def judge(self, nick, user):
- if user["violations"] < 3:
- return False
- for chan in user["channels"]:
- if chan in self._force_channel_tls:
- self.send(IRCMessage().say("ChanServ", "op %s %s" % (chan, NICKNAME)))
- self.send(IRCMessage().say(chan, self.BAN % nick))
- self.send(IRCMessage().kick(chan, nick, self.BAN % nick))
- self.send(IRCMessage().say("ChanServ", "deop %s %s" % (chan, NICKNAME)))
- return True
- class WeiboHook(IRCAbstractHook):
- WEIBO_URL_RE = re.compile(r"(http://w{0,3}\.{0,1}weibo.com/[0-9]+/[a-zA-Z0-9]{9})")
- def __init__(self):
- super().__init__()
- self._weibo = None
- self._login_lock = False
- def handle(self, msg):
- if self._weibo is None:
- self._login()
- if not msg.text:
- return
- links = self.WEIBO_URL_RE.findall(msg.text)
- for link in links:
- mid = link.split("/")[-1]
- id = WeiboHook.mid2id(mid)
- print("...Fetching %s/%s" % (mid, id))
- try:
- tweet = self._weibo.api("statuses/show").get(id=id)
- except:
- self._login()
- tweet = self._weibo.api("statuses/show").get(id=id)
- if "retweeted_status" in tweet:
- say_orig = "⇪转发: @%s: " % tweet.user.screen_name
- say_orig += tweet.text
- tweet = tweet.retweeted_status
- else:
- say_orig = ""
- say = "⇪微博: @%s: " % tweet.user.screen_name + tweet.text
- if "original_pic" in tweet:
- say += " " + tweet.original_pic
- say = say.replace("\n", " ")
- self.send(IRCMessage().say(msg.dest, say))
- time.sleep(2)
- if say_orig:
- say_orig = say_orig.replace("\n", " ")
- self.send(IRCMessage().say(msg.dest, say_orig))
- def _login(self):
- if self._login_lock:
- return
- try:
- import const
- except ImportError:
- raise ImportError("Please create const.py and provide KEY, SECRET, REDIR, USER and PASS")
- self._login_lock = True
- app = rpweibo.Application(const.KEY, const.SECRET, const.REDIR)
- self._weibo = rpweibo.Weibo(app)
- authenticator = rpweibo.UserPassAutheticator(const.USER, const.PASS)
- try:
- self._weibo.auth(authenticator)
- print("good login")
- self.send(IRCMessage().setnick(NICKNAME))
- except Exception as e:
- print("bad login", e)
- self._weibo = None
- self.send(IRCMessage().setnick(NICKNAME + "_error"))
- self._login_lock = False
- @staticmethod
- def mid2id(mid):
- def base10(base62):
- """Convert the base."""
- CHAR = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
- digit = len(base62) - 1
- num = 0
- while digit >= 0:
- for i in base62:
- i = CHAR.index(i)
- num += i * 62 ** digit
- digit -= 1
- return num
- id = ""
- id += str(base10(mid[0]))
- id += str(base10(mid[1:5])).rjust(7, "0")
- id += str(base10(mid[5:9])).rjust(7, "0")
- return id
- class ProgramRestartHook(IRCAbstractHook):
- def __init__(self):
- super().__init__()
- self._timer_queue = queue.Queue()
- self._timer_thread = threading.Thread(group=None, target=self._timer)
- self._timer_thread.start()
- def handle(self, msg):
- if msg.command == "PING":
- self._timer_queue.put(item=True)
- def _timer(self):
- while 1:
- try:
- print("checking & waiting for ping")
- self._timer_queue.get(block=True, timeout=600)
- print("connection is still alive!")
- except queue.Empty:
- restart_program("Long time no ping!")
- class LaTeXCorrectionHook(IRCAbstractHook):
- INCORRECT_LATEX_SPELLING = ["Latex", "LaTex", "laTeX", "laTex", "lateX"]
- def __init__(self):
- super().__init__()
- def handle(self, msg):
- if not msg.text:
- return
- msg = msg.normalize_bot_message(msg)
- for spell in self.INCORRECT_LATEX_SPELLING:
- if spell in msg.text:
- say = "%s: 不是 %s,是 %s!" % (msg.nick, spell, "LaTeX")
- self.send(IRCMessage().say(msg.dest, say))
- class CorebootCorrectionHook(IRCAbstractHook):
- def __init__(self):
- super().__init__()
- def handle(self, msg):
- if not msg.text:
- return
- msg = msg.normalize_bot_message(msg)
- allcb = [msg.text[m.start():m.start() + 8]
- for m in re.finditer("coreboot", msg.text.lower())]
- for spell in allcb:
- if spell != "coreboot":
- say = "%s: 不是 %s,是 coreboot!" % (msg.nick, spell)
- self.send(IRCMessage().say(msg.dest, say))
- class TorCorrectionHook(IRCAbstractHook):
- def __init__(self):
- super().__init__()
- def handle(self, msg):
- if not msg.text:
- return
- msg = msg.normalize_bot_message(msg)
- allcb = [msg.text[m.start():m.start() + 8]
- for m in re.finditer("tor", msg.text.lower())]
- for spell in allcb:
- if spell == "TOR":
- say = "%s: 不是 TOR,是 Tor! https://www.torproject.org/docs/faq#WhyCalledTor"
- self.send(IRCMessage().say(msg.dest, say))
- class SchneierQuoteHook(IRCAbstractHook):
- WEB_RSS = "https://www.schneierfacts.com/rss/random"
- def __init__(self):
- super().__init__()
- def handle(self, msg):
- if not msg.text:
- return
- msg = msg.normalize_bot_message(msg)
- if "'schneier" in msg.text:
- quote = self.fetch_schneier_quote()
- if not quote:
- quote = "Failed to get a Schneier quote, maybe the HTTPS connection is cracked by Schneier?"
- self.send(IRCMessage().say(msg.dest, quote))
- def fetch_schneier_quote(self):
- from time import sleep
- from xml.etree.ElementTree import ElementTree
- import urllib.request
- for i in range(3):
- try:
- rss_resource = urllib.request.urlopen(self.WEB_RSS)
- listing = list(ElementTree(file=rss_resource).iter("item"))
- text = listing[1].find("description").text.strip()
- return text.replace("\n", " ")
- except Exception:
- sleep(1)
- else:
- return
- class InterjectHook(IRCAbstractHook):
- def __init__(self):
- super().__init__()
- random.seed()
- try:
- self.ratio = const.INTERJECT
- except AttributeError:
- self.ratio = 40
- def handle(self, msg):
- if not msg.text:
- return
- msg = msg.normalize_bot_message(msg)
- txt = msg.text.lower()
- if "linux" in txt:
- for kws in ["gnu", "kernel", "内核", "http", "arch", "android", "bsd", "嵌入", "selinux", "harden", "util", "journal", "."]:
- if kws in txt:
- return
- interjection = "%s: 你所说的 Linux,应该是 GNU/Linux!" % (msg.nick)
- rnd = random.randrange(100)
- if rnd < self.ratio:
- self.send(IRCMessage().say(msg.dest, interjection))
- else:
- print("rnd = %d >= %d, do not interject" % (rnd, self.ratio))
- if __name__ == "__main__":
- try:
- conn = IRCConnection("irc.freenode.net", 7000, client_cert="freenode.pem")
- except Exception as e:
- conn.socket.close()
- restart_program(str(e))
- conn.register_hook(IRCBuiltinHook())
- #conn.register_hook(WeiboHook())
- conn.register_hook(IRCWhoisHook())
- conn.register_hook(ProgramRestartHook())
- conn.register_hook(LaTeXCorrectionHook())
- conn.register_hook(CorebootCorrectionHook())
- conn.register_hook(TorCorrectionHook())
- conn.register_hook(SchneierQuoteHook())
- conn.register_hook(InterjectHook())
|