123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- # Part of rabbitears See LICENSE for permissions
- # Copyright (C) 2022 Matt Arnold
- import socket
- import sys
- import irctokens
- import time
- import logging
- class IRCBadMessage(Exception):
- pass
- class IRCError(Exception):
- pass
- def printred(s):
- t = f"\033[1;31m {s} \033[0m\n"
- print(t)
- def parsemsg(s):
- """Breaks a message from an IRC server into its prefix, command, and arguments.
- """
- prefix = ''
- trailing = []
- if not s:
- raise IRCBadMessage("Empty line.")
- if s[0] == ':':
- prefix, s = s[1:].split(' ', 1)
- if s.find(' :') != -1:
- s, trailing = s.split(' :', 1)
- args = s.split()
- args.append(trailing)
- else:
- args = s.split()
- command = args.pop(0)
- return prefix, command, args
- LINEEND = '\r\n'
- class IRCBot:
- irc = None
- def __init__(self, sock, config=None, isBad=False):
- self.irc = sock
- self.connected = False
- self.config = config
- self.badircd = isBad
- def send_cmd(self, line):
- """Send an IRC Command, takes an IRC command string without the CRLF
- Returns encoded msg on success raises IRCError on failure """
- if not self.connected:
- raise IRCError("Not Connected")
- encmsg = bytes(line.format() + LINEEND, 'UTF-8' )
- expected = len(encmsg)
- if self.irc.send(encmsg) == expected:
- return str(encmsg)
- else:
- raise IRCError("Unexpected Send Length")
- def on_welcome(self, *args, **kwargs):
- authmsg = irctokens.build("NICKSERV", ['IDENTIFY', self.config['nickpass']])
- self.send_cmd(authmsg)
- joinmsg = irctokens.build("JOIN", [self.config['channel']])
- self.send_cmd(joinmsg)
-
- def send_privmsg(self, dst, msg):
- msg = irctokens.build("PRIVMSG",[dst, msg])
- self.send_cmd(msg)
-
- def send_quit(self, quitmsg):
- msg = irctokens.build("QUIT", [quitmsg])
- logging.debug(msg)
- self.send_cmd(msg)
- def send_action(self, action_msg, dst):
- pass
- def connect(self, server, port, channel, botnick, botnickpass):
- if self.config is None:
- self.config = {}
- self.config["hostname"] = server
- self.config["port"] = port
- self.config["nick"] = botnick
- self.config["channel"] = channel
- self.config["nickpass"] = botnickpass
- logging.debug("Connecting to: " + server)
- self.irc.connect((self.config["hostname"], self.config["port"]))
- self.connected = True
- # Perform user registration
- usermsg = irctokens.build("USER", [botnick, "0","*", "RabbitEars Bot"]).format()
- logging.debug(usermsg)
- self.send_cmd(usermsg)
- nickmsg = irctokens.build("NICK", [botnick])
- self.send_cmd(nickmsg)
- if self.badircd:
- time.sleep(5)
- authmsg = irctokens.build("NICKSERV", ['IDENTIFY', self.config['nickpass']])
- self.send_cmd(authmsg)
- time.sleep(5)
- self.on_welcome([self.config["hostname"]])
- def get_response(self):
- # Get the response
- resp = self.irc.recv(4096).decode("UTF-8")
- msg = parsemsg(resp)
- nwmsg = irctokens.tokenise(resp)
- logging.info(nwmsg.command)
- if nwmsg.command == "001":
- self.on_welcome(nwmsg.params)
- if nwmsg.command == "ERROR":
- raise IRCError(str(nwmsg.params[0]))
- if nwmsg.command == 'PING':
- logging.debug('Sending pong')
- pongmsg = irctokens.build("PONG", [nwmsg.params[0]])
- self.send_cmd(pongmsg)
- return msg
|