OptionalManagerPlugin.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. import time
  2. import re
  3. import collections
  4. import gevent
  5. from util import helper
  6. from Plugin import PluginManager
  7. from . import ContentDbPlugin
  8. # We can only import plugin host clases after the plugins are loaded
  9. @PluginManager.afterLoad
  10. def importPluginnedClasses():
  11. global config
  12. from Config import config
  13. def processAccessLog():
  14. global access_log
  15. if access_log:
  16. content_db = ContentDbPlugin.content_db
  17. if not content_db.conn:
  18. return False
  19. s = time.time()
  20. access_log_prev = access_log
  21. access_log = collections.defaultdict(dict)
  22. now = int(time.time())
  23. num = 0
  24. for site_id in access_log_prev:
  25. content_db.execute(
  26. "UPDATE file_optional SET time_accessed = %s WHERE ?" % now,
  27. {"site_id": site_id, "inner_path": list(access_log_prev[site_id].keys())}
  28. )
  29. num += len(access_log_prev[site_id])
  30. content_db.log.debug("Inserted %s web request stat in %.3fs" % (num, time.time() - s))
  31. def processRequestLog():
  32. global request_log
  33. if request_log:
  34. content_db = ContentDbPlugin.content_db
  35. if not content_db.conn:
  36. return False
  37. s = time.time()
  38. request_log_prev = request_log
  39. request_log = collections.defaultdict(lambda: collections.defaultdict(int)) # {site_id: {inner_path1: 1, inner_path2: 1...}}
  40. num = 0
  41. for site_id in request_log_prev:
  42. for inner_path, uploaded in request_log_prev[site_id].items():
  43. content_db.execute(
  44. "UPDATE file_optional SET uploaded = uploaded + %s WHERE ?" % uploaded,
  45. {"site_id": site_id, "inner_path": inner_path}
  46. )
  47. num += 1
  48. content_db.log.debug("Inserted %s file request stat in %.3fs" % (num, time.time() - s))
  49. if "access_log" not in locals().keys(): # To keep between module reloads
  50. access_log = collections.defaultdict(dict) # {site_id: {inner_path1: 1, inner_path2: 1...}}
  51. request_log = collections.defaultdict(lambda: collections.defaultdict(int)) # {site_id: {inner_path1: 1, inner_path2: 1...}}
  52. helper.timer(61, processAccessLog)
  53. helper.timer(60, processRequestLog)
  54. @PluginManager.registerTo("ContentManager")
  55. class ContentManagerPlugin(object):
  56. def __init__(self, *args, **kwargs):
  57. self.cache_is_pinned = {}
  58. super(ContentManagerPlugin, self).__init__(*args, **kwargs)
  59. def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
  60. if "|" in inner_path: # Big file piece
  61. file_inner_path, file_range = inner_path.split("|")
  62. else:
  63. file_inner_path = inner_path
  64. self.contents.db.executeDelayed(
  65. "UPDATE file_optional SET time_downloaded = :now, is_downloaded = 1, peer = peer + 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 0",
  66. {"now": int(time.time()), "site_id": self.contents.db.site_ids[self.site.address], "inner_path": file_inner_path}
  67. )
  68. return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
  69. def optionalRemoved(self, inner_path, hash_id, size=None):
  70. res = self.contents.db.execute(
  71. "UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 1",
  72. {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
  73. )
  74. if res.rowcount > 0:
  75. back = super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
  76. # Re-add to hashfield if we have other file with the same hash_id
  77. if self.isDownloaded(hash_id=hash_id, force_check_db=True):
  78. self.hashfield.appendHashId(hash_id)
  79. else:
  80. back = False
  81. self.cache_is_pinned = {}
  82. return back
  83. def optionalRenamed(self, inner_path_old, inner_path_new):
  84. back = super(ContentManagerPlugin, self).optionalRenamed(inner_path_old, inner_path_new)
  85. self.cache_is_pinned = {}
  86. self.contents.db.execute(
  87. "UPDATE file_optional SET inner_path = :inner_path_new WHERE site_id = :site_id AND inner_path = :inner_path_old",
  88. {"site_id": self.contents.db.site_ids[self.site.address], "inner_path_old": inner_path_old, "inner_path_new": inner_path_new}
  89. )
  90. return back
  91. def isDownloaded(self, inner_path=None, hash_id=None, force_check_db=False):
  92. if hash_id and not force_check_db and hash_id not in self.hashfield:
  93. return False
  94. if inner_path:
  95. res = self.contents.db.execute(
  96. "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND inner_path = :inner_path LIMIT 1",
  97. {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
  98. )
  99. else:
  100. res = self.contents.db.execute(
  101. "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND hash_id = :hash_id AND is_downloaded = 1 LIMIT 1",
  102. {"site_id": self.contents.db.site_ids[self.site.address], "hash_id": hash_id}
  103. )
  104. row = res.fetchone()
  105. if row and row["is_downloaded"]:
  106. return True
  107. else:
  108. return False
  109. def isPinned(self, inner_path):
  110. if inner_path in self.cache_is_pinned:
  111. self.site.log.debug("Cached is pinned: %s" % inner_path)
  112. return self.cache_is_pinned[inner_path]
  113. res = self.contents.db.execute(
  114. "SELECT is_pinned FROM file_optional WHERE site_id = :site_id AND inner_path = :inner_path LIMIT 1",
  115. {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
  116. )
  117. row = res.fetchone()
  118. if row and row[0]:
  119. is_pinned = True
  120. else:
  121. is_pinned = False
  122. self.cache_is_pinned[inner_path] = is_pinned
  123. self.site.log.debug("Cache set is pinned: %s %s" % (inner_path, is_pinned))
  124. return is_pinned
  125. def setPin(self, inner_path, is_pinned):
  126. content_db = self.contents.db
  127. site_id = content_db.site_ids[self.site.address]
  128. content_db.execute("UPDATE file_optional SET is_pinned = %d WHERE ?" % is_pinned, {"site_id": site_id, "inner_path": inner_path})
  129. self.cache_is_pinned = {}
  130. def optionalDelete(self, inner_path):
  131. if self.isPinned(inner_path):
  132. self.site.log.debug("Skip deleting pinned optional file: %s" % inner_path)
  133. return False
  134. else:
  135. return super(ContentManagerPlugin, self).optionalDelete(inner_path)
  136. @PluginManager.registerTo("WorkerManager")
  137. class WorkerManagerPlugin(object):
  138. def doneTask(self, task):
  139. super(WorkerManagerPlugin, self).doneTask(task)
  140. if task["optional_hash_id"] and not self.tasks: # Execute delayed queries immedietly after tasks finished
  141. ContentDbPlugin.content_db.processDelayed()
  142. @PluginManager.registerTo("UiRequest")
  143. class UiRequestPlugin(object):
  144. def parsePath(self, path):
  145. global access_log
  146. path_parts = super(UiRequestPlugin, self).parsePath(path)
  147. if path_parts:
  148. site_id = ContentDbPlugin.content_db.site_ids.get(path_parts["request_address"])
  149. if site_id:
  150. if ContentDbPlugin.content_db.isOptionalFile(site_id, path_parts["inner_path"]):
  151. access_log[site_id][path_parts["inner_path"]] = 1
  152. return path_parts
  153. @PluginManager.registerTo("FileRequest")
  154. class FileRequestPlugin(object):
  155. def actionGetFile(self, params):
  156. stats = super(FileRequestPlugin, self).actionGetFile(params)
  157. self.recordFileRequest(params["site"], params["inner_path"], stats)
  158. return stats
  159. def actionStreamFile(self, params):
  160. stats = super(FileRequestPlugin, self).actionStreamFile(params)
  161. self.recordFileRequest(params["site"], params["inner_path"], stats)
  162. return stats
  163. def recordFileRequest(self, site_address, inner_path, stats):
  164. if not stats:
  165. # Only track the last request of files
  166. return False
  167. site_id = ContentDbPlugin.content_db.site_ids[site_address]
  168. if site_id and ContentDbPlugin.content_db.isOptionalFile(site_id, inner_path):
  169. request_log[site_id][inner_path] += stats["bytes_sent"]
  170. @PluginManager.registerTo("Site")
  171. class SitePlugin(object):
  172. def isDownloadable(self, inner_path):
  173. is_downloadable = super(SitePlugin, self).isDownloadable(inner_path)
  174. if is_downloadable:
  175. return is_downloadable
  176. for path in self.settings.get("optional_help", {}).keys():
  177. if inner_path.startswith(path):
  178. return True
  179. return False
  180. def fileForgot(self, inner_path):
  181. if "|" in inner_path and self.content_manager.isPinned(re.sub(r"\|.*", "", inner_path)):
  182. self.log.debug("File %s is pinned, no fileForgot" % inner_path)
  183. return False
  184. else:
  185. return super(SitePlugin, self).fileForgot(inner_path)
  186. def fileDone(self, inner_path):
  187. if "|" in inner_path and self.bad_files.get(inner_path, 0) > 5: # Idle optional file done
  188. inner_path_file = re.sub(r"\|.*", "", inner_path)
  189. num_changed = 0
  190. for key, val in self.bad_files.items():
  191. if key.startswith(inner_path_file) and val > 1:
  192. self.bad_files[key] = 1
  193. num_changed += 1
  194. self.log.debug("Idle optional file piece done, changed retry number of %s pieces." % num_changed)
  195. if num_changed:
  196. gevent.spawn(self.retryBadFiles)
  197. return super(SitePlugin, self).fileDone(inner_path)
  198. @PluginManager.registerTo("ConfigPlugin")
  199. class ConfigPlugin(object):
  200. def createArguments(self):
  201. group = self.parser.add_argument_group("OptionalManager plugin")
  202. group.add_argument('--optional_limit', help='Limit total size of optional files', default="10%", metavar="GB or free space %")
  203. group.add_argument('--optional_limit_exclude_minsize', help='Exclude files larger than this limit from optional size limit calculation', default=20, metavar="MB", type=int)
  204. return super(ConfigPlugin, self).createArguments()