offutils.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #!/bin/python
  2. #This file contains some utilities common to offpunk, ansirenderer and netcache.
  3. #Currently, there are the following utilities:
  4. #
  5. # run : run a shell command and get the results with some security
  6. # term_width : get or set the width to display on the terminal
  7. import os
  8. import io
  9. import subprocess
  10. import shutil
  11. import shlex
  12. import urllib.parse
  13. import urllib.parse
  14. import netcache_migration
  15. import netcache
  16. CACHE_VERSION = 1
  17. # We upgrade the cache only once at startup, hence the UPGRADED variable
  18. # This is only to avoid unecessary checks each time the cache is accessed
  19. UPGRADED=False
  20. def upgrade_cache(cache_folder):
  21. #Let’s read current version of the cache
  22. version_path = cache_folder + ".version"
  23. current_version = 0
  24. if os.path.exists(version_path):
  25. current_str = None
  26. with open(version_path) as f:
  27. current_str = f.read()
  28. f.close()
  29. try:
  30. current_version = int(current_str)
  31. except:
  32. current_version = 0
  33. #Now, let’s upgrade the cache if needed
  34. while current_version < CACHE_VERSION:
  35. current_version += 1
  36. upgrade_func = getattr(netcache_migration,"upgrade_to_"+str(current_version))
  37. upgrade_func(cache_folder)
  38. with open(version_path,"w") as f:
  39. f.write(str(current_version))
  40. f.close()
  41. UPGRADED=True
  42. #get xdg folder. Folder should be "cache", "data" or "config"
  43. def xdg(folder="cache"):
  44. ## Config directories
  45. ## We implement our own python-xdg to avoid conflict with existing libraries.
  46. _home = os.path.expanduser('~')
  47. data_home = os.environ.get('XDG_DATA_HOME') or \
  48. os.path.join(_home,'.local','share')
  49. config_home = os.environ.get('XDG_CONFIG_HOME') or \
  50. os.path.join(_home,'.config')
  51. _CONFIG_DIR = os.path.join(os.path.expanduser(config_home),"offpunk/")
  52. _DATA_DIR = os.path.join(os.path.expanduser(data_home),"offpunk/")
  53. _old_config = os.path.expanduser("~/.offpunk/")
  54. ## Look for pre-existing config directory, if any
  55. if os.path.exists(_old_config):
  56. _CONFIG_DIR = _old_config
  57. #if no XDG .local/share and not XDG .config, we use the old config
  58. if not os.path.exists(data_home) and os.path.exists(_old_config):
  59. _DATA_DIR = _CONFIG_DIR
  60. ## get _CACHE_PATH from OFFPUNK_CACHE_PATH environment variable
  61. # if OFFPUNK_CACHE_PATH empty, set default to ~/.cache/offpunk
  62. cache_home = os.environ.get('XDG_CACHE_HOME') or\
  63. os.path.join(_home,'.cache')
  64. _CACHE_PATH = os.environ.get('OFFPUNK_CACHE_PATH', \
  65. os.path.join(os.path.expanduser(cache_home),"offpunk/"))
  66. #Check that the cache path ends with "/"
  67. if not _CACHE_PATH.endswith("/"):
  68. _CACHE_PATH += "/"
  69. os.makedirs(_CACHE_PATH,exist_ok=True)
  70. if folder == "cache" and not UPGRADED:
  71. upgrade_cache(_CACHE_PATH)
  72. if folder == "cache":
  73. return _CACHE_PATH
  74. elif folder == "config":
  75. return _CONFIG_DIR
  76. elif folder == "data":
  77. return _DATA_DIR
  78. else:
  79. print("No XDG folder for %s. Check your code."%folder)
  80. return None
  81. #An IPV6 URL should be put between []
  82. #We try to detect them has location with more than 2 ":"
  83. def fix_ipv6_url(url):
  84. if not url or url.startswith("mailto"):
  85. return url
  86. if "://" in url:
  87. schema, schemaless = url.split("://",maxsplit=1)
  88. else:
  89. schema, schemaless = None, url
  90. if "/" in schemaless:
  91. netloc, rest = schemaless.split("/",1)
  92. if netloc.count(":") > 2 and "[" not in netloc and "]" not in netloc:
  93. schemaless = "[" + netloc + "]" + "/" + rest
  94. elif schemaless.count(":") > 2 and "[" not in schemaless and "]" not in schemaless:
  95. schemaless = "[" + schemaless + "]/"
  96. if schema:
  97. return schema + "://" + schemaless
  98. return schemaless
  99. # Cheap and cheerful URL detector
  100. def looks_like_url(word):
  101. try:
  102. if not word.strip():
  103. return False
  104. url = fix_ipv6_url(word).strip()
  105. parsed = urllib.parse.urlparse(url)
  106. #sometimes, urllib crashed only when requesting the port
  107. port = parsed.port
  108. scheme = word.split("://")[0]
  109. mailto = word.startswith("mailto:")
  110. start = scheme in netcache.standard_ports
  111. local = scheme in ["file","list"]
  112. if mailto:
  113. return "@" in word
  114. elif not local:
  115. if start:
  116. #IPv4
  117. if "." in word or "localhost" in word:
  118. return True
  119. #IPv6
  120. elif "[" in word and ":" in word and "]" in word:
  121. return True
  122. else: return False
  123. else: return False
  124. return start and ("." in word or "localhost" in word or ":" in word)
  125. else:
  126. return "/" in word
  127. except ValueError:
  128. return False
  129. ## Those two functions add/remove the mode to the
  130. # URLs. This is a gross hack to remember the mode
  131. def mode_url(url,mode):
  132. if mode and mode!= "readable" and "##offpunk=" not in url:
  133. url += "##offpunk_mode=" + mode
  134. return url
  135. def unmode_url(url):
  136. mode = None
  137. splitted = url.split("##offpunk_mode=")
  138. if len(splitted) > 1:
  139. url = splitted[0]
  140. mode = splitted[1]
  141. return [url,mode]
  142. # In terms of arguments, this can take an input file/string to be passed to
  143. # stdin, a parameter to do (well-escaped) "%" replacement on the command, a
  144. # flag requesting that the output go directly to the stdout, and a list of
  145. # additional environment variables to set.
  146. def run(cmd, *, input=None, parameter=None, direct_output=False, env={}):
  147. if parameter:
  148. cmd = cmd % shlex.quote(parameter)
  149. e = os.environ
  150. e.update(env)
  151. if isinstance(input, io.IOBase):
  152. stdin = input
  153. input = None
  154. else:
  155. if input:
  156. input = input.encode()
  157. stdin = None
  158. if not direct_output:
  159. # subprocess.check_output() wouldn't allow us to pass stdin.
  160. result = subprocess.run(cmd, check=True, env=e, input=input,
  161. shell=True, stdin=stdin, stdout=subprocess.PIPE,
  162. stderr=subprocess.STDOUT)
  163. return result.stdout.decode()
  164. else:
  165. subprocess.run(cmd, env=e, input=input, shell=True, stdin=stdin)
  166. global TERM_WIDTH
  167. TERM_WIDTH = 72
  168. #if absolute, returns the real terminal width, not the text width
  169. def term_width(new_width=None,absolute=False):
  170. if new_width:
  171. global TERM_WIDTH
  172. TERM_WIDTH = new_width
  173. cur = shutil.get_terminal_size()[0]
  174. if absolute:
  175. return cur
  176. width = TERM_WIDTH
  177. if cur < width:
  178. width = cur
  179. return width
  180. def is_local(url):
  181. if not url: return True
  182. elif "://" in url:
  183. scheme,path = url.split("://",maxsplit=1)
  184. return scheme in ["file","mail","list","mailto"]
  185. else:
  186. return True
  187. # This method return the image URL or invent it if it’s a base64 inline image
  188. # It returns [url,image_data] where image_data is None for normal image
  189. def looks_like_base64(src,baseurl):
  190. imgdata = None
  191. imgname = src
  192. if src and src.startswith("data:image/"):
  193. if ";base64," in src:
  194. splitted = src.split(";base64,")
  195. #splitted[0] is something like data:image/jpg
  196. if "/" in splitted[0]:
  197. extension = splitted[0].split("/")[1]
  198. else:
  199. extension = "data"
  200. imgdata = splitted[1]
  201. imgname = imgdata[:20] + "." + extension
  202. imgurl = urllib.parse.urljoin(baseurl, imgname)
  203. else:
  204. #We can’t handle other data:image such as svg for now
  205. imgurl = None
  206. else:
  207. imgurl = urllib.parse.urljoin(baseurl, imgname)
  208. return imgurl,imgdata