123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #!/bin/python
- #This file contains some utilities common to offpunk, ansirenderer and netcache.
- #Currently, there are the following utilities:
- #
- # run : run a shell command and get the results with some security
- # term_width : get or set the width to display on the terminal
- import os
- import io
- import subprocess
- import shutil
- import shlex
- import urllib.parse
- import urllib.parse
- import netcache_migration
- import netcache
- CACHE_VERSION = 1
- # We upgrade the cache only once at startup, hence the UPGRADED variable
- # This is only to avoid unecessary checks each time the cache is accessed
- UPGRADED=False
- def upgrade_cache(cache_folder):
- #Let’s read current version of the cache
- version_path = cache_folder + ".version"
- current_version = 0
- if os.path.exists(version_path):
- current_str = None
- with open(version_path) as f:
- current_str = f.read()
- f.close()
- try:
- current_version = int(current_str)
- except:
- current_version = 0
- #Now, let’s upgrade the cache if needed
- while current_version < CACHE_VERSION:
- current_version += 1
- upgrade_func = getattr(netcache_migration,"upgrade_to_"+str(current_version))
- upgrade_func(cache_folder)
- with open(version_path,"w") as f:
- f.write(str(current_version))
- f.close()
- UPGRADED=True
- #get xdg folder. Folder should be "cache", "data" or "config"
- def xdg(folder="cache"):
- ## Config directories
- ## We implement our own python-xdg to avoid conflict with existing libraries.
- _home = os.path.expanduser('~')
- data_home = os.environ.get('XDG_DATA_HOME') or \
- os.path.join(_home,'.local','share')
- config_home = os.environ.get('XDG_CONFIG_HOME') or \
- os.path.join(_home,'.config')
- _CONFIG_DIR = os.path.join(os.path.expanduser(config_home),"offpunk/")
- _DATA_DIR = os.path.join(os.path.expanduser(data_home),"offpunk/")
- _old_config = os.path.expanduser("~/.offpunk/")
- ## Look for pre-existing config directory, if any
- if os.path.exists(_old_config):
- _CONFIG_DIR = _old_config
- #if no XDG .local/share and not XDG .config, we use the old config
- if not os.path.exists(data_home) and os.path.exists(_old_config):
- _DATA_DIR = _CONFIG_DIR
- ## get _CACHE_PATH from OFFPUNK_CACHE_PATH environment variable
- # if OFFPUNK_CACHE_PATH empty, set default to ~/.cache/offpunk
- cache_home = os.environ.get('XDG_CACHE_HOME') or\
- os.path.join(_home,'.cache')
- _CACHE_PATH = os.environ.get('OFFPUNK_CACHE_PATH', \
- os.path.join(os.path.expanduser(cache_home),"offpunk/"))
- #Check that the cache path ends with "/"
- if not _CACHE_PATH.endswith("/"):
- _CACHE_PATH += "/"
- os.makedirs(_CACHE_PATH,exist_ok=True)
- if folder == "cache" and not UPGRADED:
- upgrade_cache(_CACHE_PATH)
- if folder == "cache":
- return _CACHE_PATH
- elif folder == "config":
- return _CONFIG_DIR
- elif folder == "data":
- return _DATA_DIR
- else:
- print("No XDG folder for %s. Check your code."%folder)
- return None
- #An IPV6 URL should be put between []
- #We try to detect them has location with more than 2 ":"
- def fix_ipv6_url(url):
- if not url or url.startswith("mailto"):
- return url
- if "://" in url:
- schema, schemaless = url.split("://",maxsplit=1)
- else:
- schema, schemaless = None, url
- if "/" in schemaless:
- netloc, rest = schemaless.split("/",1)
- if netloc.count(":") > 2 and "[" not in netloc and "]" not in netloc:
- schemaless = "[" + netloc + "]" + "/" + rest
- elif schemaless.count(":") > 2 and "[" not in schemaless and "]" not in schemaless:
- schemaless = "[" + schemaless + "]/"
- if schema:
- return schema + "://" + schemaless
- return schemaless
- # Cheap and cheerful URL detector
- def looks_like_url(word):
- try:
- if not word.strip():
- return False
- url = fix_ipv6_url(word).strip()
- parsed = urllib.parse.urlparse(url)
- #sometimes, urllib crashed only when requesting the port
- port = parsed.port
- scheme = word.split("://")[0]
- mailto = word.startswith("mailto:")
- start = scheme in netcache.standard_ports
- local = scheme in ["file","list"]
- if mailto:
- return "@" in word
- elif not local:
- if start:
- #IPv4
- if "." in word or "localhost" in word:
- return True
- #IPv6
- elif "[" in word and ":" in word and "]" in word:
- return True
- else: return False
- else: return False
- return start and ("." in word or "localhost" in word or ":" in word)
- else:
- return "/" in word
- except ValueError:
- return False
- ## Those two functions add/remove the mode to the
- # URLs. This is a gross hack to remember the mode
- def mode_url(url,mode):
- if mode and mode!= "readable" and "##offpunk=" not in url:
- url += "##offpunk_mode=" + mode
- return url
- def unmode_url(url):
- mode = None
- splitted = url.split("##offpunk_mode=")
- if len(splitted) > 1:
- url = splitted[0]
- mode = splitted[1]
- return [url,mode]
- # In terms of arguments, this can take an input file/string to be passed to
- # stdin, a parameter to do (well-escaped) "%" replacement on the command, a
- # flag requesting that the output go directly to the stdout, and a list of
- # additional environment variables to set.
- def run(cmd, *, input=None, parameter=None, direct_output=False, env={}):
- if parameter:
- cmd = cmd % shlex.quote(parameter)
- e = os.environ
- e.update(env)
- if isinstance(input, io.IOBase):
- stdin = input
- input = None
- else:
- if input:
- input = input.encode()
- stdin = None
- if not direct_output:
- # subprocess.check_output() wouldn't allow us to pass stdin.
- result = subprocess.run(cmd, check=True, env=e, input=input,
- shell=True, stdin=stdin, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- return result.stdout.decode()
- else:
- subprocess.run(cmd, env=e, input=input, shell=True, stdin=stdin)
- global TERM_WIDTH
- TERM_WIDTH = 72
- #if absolute, returns the real terminal width, not the text width
- def term_width(new_width=None,absolute=False):
- if new_width:
- global TERM_WIDTH
- TERM_WIDTH = new_width
- cur = shutil.get_terminal_size()[0]
- if absolute:
- return cur
- width = TERM_WIDTH
- if cur < width:
- width = cur
- return width
- def is_local(url):
- if not url: return True
- elif "://" in url:
- scheme,path = url.split("://",maxsplit=1)
- return scheme in ["file","mail","list","mailto"]
- else:
- return True
- # This method return the image URL or invent it if it’s a base64 inline image
- # It returns [url,image_data] where image_data is None for normal image
- def looks_like_base64(src,baseurl):
- imgdata = None
- imgname = src
- if src and src.startswith("data:image/"):
- if ";base64," in src:
- splitted = src.split(";base64,")
- #splitted[0] is something like data:image/jpg
- if "/" in splitted[0]:
- extension = splitted[0].split("/")[1]
- else:
- extension = "data"
- imgdata = splitted[1]
- imgname = imgdata[:20] + "." + extension
- imgurl = urllib.parse.urljoin(baseurl, imgname)
- else:
- #We can’t handle other data:image such as svg for now
- imgurl = None
- else:
- imgurl = urllib.parse.urljoin(baseurl, imgname)
- return imgurl,imgdata
|