123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- #!/usr/bin/env python3
- # -*- cofing: utf-8 -*-
- # vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:
- # Copyright 2014-2018 Florian Bruhin (The Compiler) <mail@qutebrowser.org>
- #
- # This file is part of qutebrowser.
- #
- # qutebrowser is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # qutebrowser is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with qutebrowser. If not, see <http://www.gnu.org/licenses/>.
- """Functions related to ad blocking."""
- import io
- import os.path
- import functools
- import posixpath
- import zipfile
- from qutebrowser.browser import downloads
- from qutebrowser.config import config
- from qutebrowser.utils import objreg, standarddir, log, message
- from qutebrowser.commands import cmdutils
- def _guess_zip_filename(zf):
- """Guess which file to use inside a zip file.
- Args:
- zf: A ZipFile instance.
- """
- files = zf.namelist()
- if len(files) == 1:
- return files[0]
- else:
- for e in files:
- if posixpath.splitext(e)[0].lower() == 'hosts':
- return e
- raise FileNotFoundError("No hosts file found in zip")
- def get_fileobj(byte_io):
- """Get a usable file object to read the hosts file from."""
- byte_io.seek(0) # rewind downloaded file
- if zipfile.is_zipfile(byte_io):
- byte_io.seek(0) # rewind what zipfile.is_zipfile did
- zf = zipfile.ZipFile(byte_io)
- filename = _guess_zip_filename(zf)
- byte_io = zf.open(filename, mode='r')
- else:
- byte_io.seek(0) # rewind what zipfile.is_zipfile did
- return byte_io
- def _is_whitelisted_url(url):
- """Check if the given URL is on the adblock whitelist.
- Args:
- url: The URL to check as QUrl.
- """
- for pattern in config.val.content.host_blocking.whitelist:
- if pattern.matches(url):
- return True
- return False
- class _FakeDownload:
- """A download stub to use on_download_finished with local files."""
- def __init__(self, fileobj):
- self.basename = os.path.basename(fileobj.name)
- self.fileobj = fileobj
- self.successful = True
- class HostBlocker:
- """Manage blocked hosts based from /etc/hosts-like files.
- Attributes:
- _blocked_hosts: A set of blocked hosts.
- _config_blocked_hosts: A set of blocked hosts from ~/.config.
- _in_progress: The DownloadItems which are currently downloading.
- _done_count: How many files have been read successfully.
- _local_hosts_file: The path to the blocked-hosts file.
- _config_hosts_file: The path to a blocked-hosts in ~/.config
- """
- def __init__(self):
- self._blocked_hosts = set()
- self._config_blocked_hosts = set()
- self._in_progress = []
- self._done_count = 0
- data_dir = standarddir.data()
- self._local_hosts_file = os.path.join(data_dir, 'blocked-hosts')
- self._update_files()
- config_dir = standarddir.config()
- self._config_hosts_file = os.path.join(config_dir, 'blocked-hosts')
- config.instance.changed.connect(self._update_files)
- def is_blocked(self, url, first_party_url=None):
- """Check if the given URL (as QUrl) is blocked."""
- if first_party_url is not None and not first_party_url.isValid():
- first_party_url = None
- if not config.instance.get('content.host_blocking.enabled',
- url=first_party_url):
- return False
- host = url.host()
- u = url.toDisplayString()
- if 'youtube.com/get_midroll_' in u:
- return True
- return ((host in self._blocked_hosts or
- host in self._config_blocked_hosts) and
- not _is_whitelisted_url(url))
- def _read_hosts_file(self, filename, target):
- """Read hosts from the given filename.
- Args:
- filename: The file to read.
- target: The set to store the hosts in.
- Return:
- True if a read was attempted, False otherwise
- """
- if not os.path.exists(filename):
- return False
- try:
- with open(filename, 'r', encoding='utf-8') as f:
- for line in f:
- target.add(line.strip())
- except (OSError, UnicodeDecodeError):
- log.misc.exception("Failed to read host blocklist!")
- return True
- def read_hosts(self):
- """Read hosts from the existing blocked-hosts file."""
- self._blocked_hosts = set()
- self._read_hosts_file(self._config_hosts_file,
- self._config_blocked_hosts)
- found = self._read_hosts_file(self._local_hosts_file,
- self._blocked_hosts)
- if not found:
- args = objreg.get('args')
- if (config.val.content.host_blocking.lists and
- args.basedir is None and
- config.val.content.host_blocking.enabled):
- message.info("Run :adblock-update to get adblock lists.")
- @cmdutils.register(instance='host-blocker')
- def adblock_update(self):
- """Update the adblock block lists.
- This updates `~/.local/share/qutebrowser/blocked-hosts` with downloaded
- host lists and re-reads `~/.config/qutebrowser/blocked-hosts`.
- """
- self._read_hosts_file(self._config_hosts_file,
- self._config_blocked_hosts)
- self._blocked_hosts = set()
- self._done_count = 0
- download_manager = objreg.get('qtnetwork-download-manager')
- for url in config.val.content.host_blocking.lists:
- if url.scheme() == 'file':
- filename = url.toLocalFile()
- if os.path.isdir(filename):
- for entry in os.scandir(filename):
- if entry.is_file():
- self._import_local(entry.path)
- else:
- self._import_local(filename)
- else:
- fobj = io.BytesIO()
- fobj.name = 'adblock: ' + url.host()
- target = downloads.FileObjDownloadTarget(fobj)
- download = download_manager.get(url, target=target,
- auto_remove=True)
- self._in_progress.append(download)
- download.finished.connect(
- functools.partial(self._on_download_finished, download))
- def _import_local(self, filename):
- """Adds the contents of a file to the blocklist.
- Args:
- filename: path to a local file to import.
- """
- try:
- fileobj = open(filename, 'rb')
- except OSError as e:
- message.error("adblock: Error while reading {}: {}".format(
- filename, e.strerror))
- return
- download = _FakeDownload(fileobj)
- self._in_progress.append(download)
- self._on_download_finished(download)
- def _parse_line(self, line):
- """Parse a line from a host file.
- Args:
- line: The bytes object to parse.
- Returns:
- True if parsing succeeded, False otherwise.
- """
- if line.startswith(b'#'):
- # Ignoring comments early so we don't have to care about
- # encoding errors in them.
- return True
- try:
- line = line.decode('utf-8')
- except UnicodeDecodeError:
- log.misc.error("Failed to decode: {!r}".format(line))
- return False
- # Remove comments
- try:
- hash_idx = line.index('#')
- line = line[:hash_idx]
- except ValueError:
- pass
- line = line.strip()
- # Skip empty lines
- if not line:
- return True
- parts = line.split()
- if len(parts) == 1:
- # "one host per line" format
- hosts = [parts[0]]
- else:
- # /etc/hosts format
- hosts = parts[1:]
- for host in hosts:
- if ('.' in host and
- not host.endswith('.localdomain') and
- host != '0.0.0.0'):
- self._blocked_hosts.add(host)
- return True
- def _merge_file(self, byte_io):
- """Read and merge host files.
- Args:
- byte_io: The BytesIO object of the completed download.
- Return:
- A set of the merged hosts.
- """
- error_count = 0
- line_count = 0
- try:
- f = get_fileobj(byte_io)
- except (OSError, zipfile.BadZipFile, zipfile.LargeZipFile,
- LookupError) as e:
- message.error("adblock: Error while reading {}: {} - {}".format(
- byte_io.name, e.__class__.__name__, e))
- return
- for line in f:
- line_count += 1
- ok = self._parse_line(line)
- if not ok:
- error_count += 1
- log.misc.debug("{}: read {} lines".format(byte_io.name, line_count))
- if error_count > 0:
- message.error("adblock: {} read errors for {}".format(
- error_count, byte_io.name))
- def _on_lists_downloaded(self):
- """Install block lists after files have been downloaded."""
- with open(self._local_hosts_file, 'w', encoding='utf-8') as f:
- for host in sorted(self._blocked_hosts):
- f.write(host + '\n')
- message.info("adblock: Read {} hosts from {} sources.".format(
- len(self._blocked_hosts), self._done_count))
- @config.change_filter('content.host_blocking.lists')
- def _update_files(self):
- """Update files when the config changed."""
- if not config.val.content.host_blocking.lists:
- try:
- os.remove(self._local_hosts_file)
- except FileNotFoundError:
- pass
- except OSError as e:
- log.misc.exception("Failed to delete hosts file: {}".format(e))
- def _on_download_finished(self, download):
- """Check if all downloads are finished and if so, trigger reading.
- Arguments:
- download: The finished DownloadItem.
- """
- self._in_progress.remove(download)
- if download.successful:
- self._done_count += 1
- try:
- self._merge_file(download.fileobj)
- finally:
- download.fileobj.close()
- if not self._in_progress:
- try:
- self._on_lists_downloaded()
- except OSError:
- log.misc.exception("Failed to write host block list!")
|