util.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. """Utility functions."""
  2. import base64
  3. import hashlib
  4. import json
  5. import os
  6. import sys
  7. __all__ = ['urlsafe_b64encode', 'urlsafe_b64decode', 'utf8',
  8. 'to_json', 'from_json', 'matches_requirement']
  9. # For encoding ascii back and forth between bytestrings, as is repeatedly
  10. # necessary in JSON-based crypto under Python 3
  11. if sys.version_info[0] < 3:
  12. text_type = unicode # noqa: F821
  13. def native(s, encoding='ascii'):
  14. return s
  15. else:
  16. text_type = str
  17. def native(s, encoding='ascii'):
  18. if isinstance(s, bytes):
  19. return s.decode(encoding)
  20. return s
  21. def urlsafe_b64encode(data):
  22. """urlsafe_b64encode without padding"""
  23. return base64.urlsafe_b64encode(data).rstrip(binary('='))
  24. def urlsafe_b64decode(data):
  25. """urlsafe_b64decode without padding"""
  26. pad = b'=' * (4 - (len(data) & 3))
  27. return base64.urlsafe_b64decode(data + pad)
  28. def to_json(o):
  29. """Convert given data to JSON."""
  30. return json.dumps(o, sort_keys=True)
  31. def from_json(j):
  32. """Decode a JSON payload."""
  33. return json.loads(j)
  34. def open_for_csv(name, mode):
  35. if sys.version_info[0] < 3:
  36. kwargs = {}
  37. mode += 'b'
  38. else:
  39. kwargs = {'newline': '', 'encoding': 'utf-8'}
  40. return open(name, mode, **kwargs)
  41. def utf8(data):
  42. """Utf-8 encode data."""
  43. if isinstance(data, text_type):
  44. return data.encode('utf-8')
  45. return data
  46. def binary(s):
  47. if isinstance(s, text_type):
  48. return s.encode('ascii')
  49. return s
  50. class HashingFile(object):
  51. def __init__(self, path, mode, hashtype='sha256'):
  52. self.fd = open(path, mode)
  53. self.hashtype = hashtype
  54. self.hash = hashlib.new(hashtype)
  55. self.length = 0
  56. def write(self, data):
  57. self.hash.update(data)
  58. self.length += len(data)
  59. self.fd.write(data)
  60. def close(self):
  61. self.fd.close()
  62. def digest(self):
  63. if self.hashtype == 'md5':
  64. return self.hash.hexdigest()
  65. digest = self.hash.digest()
  66. return self.hashtype + '=' + native(urlsafe_b64encode(digest))
  67. def __enter__(self):
  68. return self
  69. def __exit__(self, exc_type, exc_val, exc_tb):
  70. self.fd.close()
  71. if sys.platform == 'win32':
  72. import ctypes.wintypes
  73. # CSIDL_APPDATA for reference - not used here for compatibility with
  74. # dirspec, which uses LOCAL_APPDATA and COMMON_APPDATA in that order
  75. csidl = {'CSIDL_APPDATA': 26, 'CSIDL_LOCAL_APPDATA': 28, 'CSIDL_COMMON_APPDATA': 35}
  76. def get_path(name):
  77. SHGFP_TYPE_CURRENT = 0
  78. buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
  79. ctypes.windll.shell32.SHGetFolderPathW(0, csidl[name], 0, SHGFP_TYPE_CURRENT, buf)
  80. return buf.value
  81. def save_config_path(*resource):
  82. appdata = get_path("CSIDL_LOCAL_APPDATA")
  83. path = os.path.join(appdata, *resource)
  84. if not os.path.isdir(path):
  85. os.makedirs(path)
  86. return path
  87. def load_config_paths(*resource):
  88. ids = ["CSIDL_LOCAL_APPDATA", "CSIDL_COMMON_APPDATA"]
  89. for id in ids:
  90. base = get_path(id)
  91. path = os.path.join(base, *resource)
  92. if os.path.exists(path):
  93. yield path
  94. else:
  95. def save_config_path(*resource):
  96. import xdg.BaseDirectory
  97. return xdg.BaseDirectory.save_config_path(*resource)
  98. def load_config_paths(*resource):
  99. import xdg.BaseDirectory
  100. return xdg.BaseDirectory.load_config_paths(*resource)
  101. def matches_requirement(req, wheels):
  102. """List of wheels matching a requirement.
  103. :param req: The requirement to satisfy
  104. :param wheels: List of wheels to search.
  105. """
  106. try:
  107. from pkg_resources import Distribution, Requirement
  108. except ImportError:
  109. raise RuntimeError("Cannot use requirements without pkg_resources")
  110. req = Requirement.parse(req)
  111. selected = []
  112. for wf in wheels:
  113. f = wf.parsed_filename
  114. dist = Distribution(project_name=f.group("name"), version=f.group("ver"))
  115. if dist in req:
  116. selected.append(wf)
  117. return selected