123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- import json
- import logging
- import time
- import re
- from praw.exceptions import ClientException
- from urllib.parse import unquote
- SUBREDDIT_NAME = 'Piracy'
- logger = logging.getLogger('main')
- class SubmissionRemover:
- """Moderation helper. Comment with `!rule x` keyword to have a submission removed due to breaking that specific rule.
- Follow up with a `ban x` keyword to help the bot decide whether to ban the submission OP for the specified amount of days, 0 days is permaban.
- The removal reasons for `!rule x` keywords are read from the moderation toolbox extension settings
- saved in the subreddit's wiki in `reddit.com/r/subreddit/wiki/toolbox/`
- """
- # Moderator Toolbox extension's settings: ie. toolbox settings path is sitting on the base path `reddit.com/r/subreddit/wiki/toolbox/`
- toolbox_settings_wiki_path = 'toolbox'
- cache_frequency_secs = 15 * 60
-
- rule_re = r'(?si)!(?:(rule) *(\d))'
- custom_re = r'(?si)!(custom) *(.+)?'
- ban_re = r'!rule *\d ban (\d+)'
- def __init__(self, reddit):
- self.subreddit = reddit.subreddit(SUBREDDIT_NAME)
- self.last_cached = 0
- self.toolbox_settings = None
- self._load_settings()
- def handle_comment(self, comment):
- if comment.subreddit.display_name != SUBREDDIT_NAME:
- return
- self._load_settings()
- keyword = self._get_keyword(comment)
- ban_length = self._get_ban_length(comment)
- if keyword and comment.author.name in self.subreddit.moderator():
- removal_reason = self._get_removal_reason(keyword, comment)
- self._remove_submission(comment, keyword, removal_reason)
- # 'if ban_length' should not be used because ban_length could be 0
- if ban_length is not None:
- self._ban_user(comment, keyword, ban_length)
- def _load_settings(self):
- if time.time() - self.last_cached > self.cache_frequency_secs:
- toolbox_settings = self.subreddit.wiki[self.toolbox_settings_wiki_path].content_md
- toolbox_settings = json.loads(toolbox_settings)
- self.last_cached = time.time()
- self.toolbox_settings = toolbox_settings
- def _get_keyword(self, comment):
- match = re.search(self.rule_re, comment.body)
- if match:
- return match.group(1) + ' ' + match.group(2)
- match = re.search(self.custom_re, comment.body)
- if match:
- return match.group(1)
- return None
- def _get_removal_reason(self, keyword, comment):
- if keyword.lower() == 'custom':
- return '> ' + re.search(self.custom_re, comment.body).group(2).replace('\n', '\n> ')
- for reason in self.toolbox_settings['removalReasons']['reasons']:
- if reason['flairText'].lower() == keyword.lower():
- return unquote(reason['text'])
- return None
- def _get_ban_length(self, comment):
- ban_length = re.search(self.ban_re, comment.body, re.IGNORECASE)
- if ban_length:
- return int(ban_length.group(1))
- return None
- def _ban_user(self, comment, keyword, ban_length):
- submission = comment.submission
- if keyword.lower() == 'custom' or submission.author is None:
- return
- if ban_length == 0:
- ban_length = None
- ban_reason = keyword
- self.subreddit.banned.add(
- comment.submission.author.name,
- duration=ban_length,
- ban_reason=ban_reason,
- ban_message=f'Banned due to breaking {keyword}. https://reddit.com{submission.permalink}',
- note=f'Banned by {comment.author.name}. Offending post: https://reddit.com{submission.permalink}'
- )
- logger.info(f'Banning {comment.submission.author.name} for {ban_length} days')
- def _remove_submission(self, comment, keyword, removal_reason):
- submission = comment.submission
- if submission.selftext == '[deleted]':
- return
- logger.info(f'Remover (triggered by /u/{comment.author.name}): Processing submission {submission.id} ({submission.title})')
- if not submission.removed:
- submission.mod.remove()
- author_name = 'there'
- # check if author is discoverable: if user account is deleted, submission.author (praw.models.Redditor) instance will yield None
- if submission.author is not None:
- author_name = '/u/' + submission.author.name
- flair_text = 'custom removal' if keyword.lower() == 'custom' else keyword
- submission.mod.flair(text=flair_text)
- removal_reason = f'Hello {author_name}, your submission has been removed due to:\n\n' + removal_reason
- myreply = submission.reply(removal_reason)
- myreply.mod.distinguish(how='yes', sticky=True)
- myreply.disable_inbox_replies()
- if keyword.lower().endswith( ('rule 2', 'rule 3') ):
- submission.mod.lock()
- class Nuker:
- """Auto remove a comment tree or submission. The root of the comment tree to remove is decided
- based on which comment the keyword `!nuke` was commented as a reply. If `!nuke` is commented directly
- as a reply to the submission itself, all comments will be nuked
- """
- nuke_kw_re = r'(?i)!nuke\b'
- unnuke_kw_re = r'(?i)!unnuke\b'
- def __init__(self, reddit):
- self.subreddit = reddit.subreddit(SUBREDDIT_NAME)
- def handle_comment(self, comment):
- if comment.subreddit.display_name != SUBREDDIT_NAME:
- return
- if re.search(self.nuke_kw_re, comment.body):
- self._process_nuke(comment)
- if re.search(self.unnuke_kw_re, comment.body):
- self._process_unnuke(comment)
- def _process_nuke(self, comment):
- if not comment.author in self.subreddit.moderator():
- return
- parent = comment.parent()
- if comment.parent_id.startswith('t3'):
- logger.info(f'Nuking submission (exec by /u/{comment.author.name}): {parent.permalink}')
- self._nuke_submission(parent)
- else:
- logger.info(f'Nuking comment tree (exec by /u/{comment.author.name}): {parent.permalink}')
- self._nuke_comment_tree(parent)
- try:
- comment.refresh()
- except ClientException as e:
- self.logger.info(f'ClientException: {e}. Comment permalink: https://reddit.com{comment.permalink}')
- return
- comment.replies.replace_more(limit=None)
- child_comments = comment.replies.list()
- for child_comment in child_comments:
- if child_comment.removed:
- child_comment.mod.approve()
- def _nuke_submission(self, submission):
- submission.comments.replace_more(limit=None)
- if not submission.removed:
- submission.mod.remove()
- comments = submission.comments.list()
- for comment in comments:
- if comment.author is not None and comment.author.name == 'PiracyBot' and comment.distinguished == 'moderator':
- continue
- if not comment.removed:
- comment.mod.remove()
- def _nuke_comment_tree(self, comment):
- if not comment.removed:
- comment.mod.remove()
- comment.refresh()
- comment.replies.replace_more(limit=None)
- child_comments = comment.replies.list()
- for child_comment in child_comments:
- if comment.author is not None and comment.author.name == 'PiracyBot' and comment.distinguished == 'moderator':
- continue
- if not child_comment.removed:
- child_comment.mod.remove()
- def _process_unnuke(self, comment):
- if not comment.author in self.subreddit.moderator():
- return
- parent = comment.parent()
- if comment.parent_id.startswith('t3'):
- logger.info(f'Un-nuking submission (exec by /u/{comment.author.name}): {parent.permalink}')
- self._unnuke_submission(parent)
- else:
- logger.info(f'Un-nuking comment tree (exec by /u/{comment.author.name}): {parent.permalink}')
- self._unnuke_comment_tree(parent)
- def _unnuke_submission(self, submission):
- submission.comments.replace_more(limit=None)
- if submission.removed:
- submission.mod.approve()
- comments = submission.comments.list()
- for comment in comments:
- if comment.removed:
- comment.mod.approve()
- def _unnuke_comment_tree(self, comment):
- if comment.removed:
- comment.mod.approve()
|