ContentDbPlugin.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. import time
  2. import collections
  3. import itertools
  4. import re
  5. import gevent
  6. from util import helper
  7. from Plugin import PluginManager
  8. from Config import config
  9. from Debug import Debug
  10. if "content_db" not in locals().keys(): # To keep between module reloads
  11. content_db = None
  12. @PluginManager.registerTo("ContentDb")
  13. class ContentDbPlugin(object):
  14. def __init__(self, *args, **kwargs):
  15. global content_db
  16. content_db = self
  17. self.filled = {} # Site addresses that already filled from content.json
  18. self.need_filling = False # file_optional table just created, fill data from content.json files
  19. self.time_peer_numbers_updated = 0
  20. self.my_optional_files = {} # Last 50 site_address/inner_path called by fileWrite (auto-pinning these files)
  21. self.optional_files = collections.defaultdict(dict)
  22. self.optional_files_loaded = False
  23. self.timer_check_optional = helper.timer(60 * 5, self.checkOptionalLimit)
  24. super(ContentDbPlugin, self).__init__(*args, **kwargs)
  25. def getSchema(self):
  26. schema = super(ContentDbPlugin, self).getSchema()
  27. # Need file_optional table
  28. schema["tables"]["file_optional"] = {
  29. "cols": [
  30. ["file_id", "INTEGER PRIMARY KEY UNIQUE NOT NULL"],
  31. ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"],
  32. ["inner_path", "TEXT"],
  33. ["hash_id", "INTEGER"],
  34. ["size", "INTEGER"],
  35. ["peer", "INTEGER DEFAULT 0"],
  36. ["uploaded", "INTEGER DEFAULT 0"],
  37. ["is_downloaded", "INTEGER DEFAULT 0"],
  38. ["is_pinned", "INTEGER DEFAULT 0"],
  39. ["time_added", "INTEGER DEFAULT 0"],
  40. ["time_downloaded", "INTEGER DEFAULT 0"],
  41. ["time_accessed", "INTEGER DEFAULT 0"]
  42. ],
  43. "indexes": [
  44. "CREATE UNIQUE INDEX file_optional_key ON file_optional (site_id, inner_path)",
  45. "CREATE INDEX is_downloaded ON file_optional (is_downloaded)"
  46. ],
  47. "schema_changed": 11
  48. }
  49. return schema
  50. def initSite(self, site):
  51. super(ContentDbPlugin, self).initSite(site)
  52. if self.need_filling:
  53. self.fillTableFileOptional(site)
  54. def checkTables(self):
  55. changed_tables = super(ContentDbPlugin, self).checkTables()
  56. if "file_optional" in changed_tables:
  57. self.need_filling = True
  58. return changed_tables
  59. # Load optional files ending
  60. def loadFilesOptional(self):
  61. s = time.time()
  62. num = 0
  63. total = 0
  64. total_downloaded = 0
  65. res = content_db.execute("SELECT site_id, inner_path, size, is_downloaded FROM file_optional")
  66. site_sizes = collections.defaultdict(lambda: collections.defaultdict(int))
  67. for row in res:
  68. self.optional_files[row["site_id"]][row["inner_path"][-8:]] = 1
  69. num += 1
  70. # Update site size stats
  71. site_sizes[row["site_id"]]["size_optional"] += row["size"]
  72. if row["is_downloaded"]:
  73. site_sizes[row["site_id"]]["optional_downloaded"] += row["size"]
  74. # Site site size stats to sites.json settings
  75. site_ids_reverse = {val: key for key, val in self.site_ids.items()}
  76. for site_id, stats in site_sizes.items():
  77. site_address = site_ids_reverse.get(site_id)
  78. if not site_address or site_address not in self.sites:
  79. self.log.error("Not found site_id: %s" % site_id)
  80. continue
  81. site = self.sites[site_address]
  82. site.settings["size_optional"] = stats["size_optional"]
  83. site.settings["optional_downloaded"] = stats["optional_downloaded"]
  84. total += stats["size_optional"]
  85. total_downloaded += stats["optional_downloaded"]
  86. self.log.info(
  87. "Loaded %s optional files: %.2fMB, downloaded: %.2fMB in %.3fs" %
  88. (num, float(total) / 1024 / 1024, float(total_downloaded) / 1024 / 1024, time.time() - s)
  89. )
  90. if self.need_filling and self.getOptionalLimitBytes() >= 0 and self.getOptionalLimitBytes() < total_downloaded:
  91. limit_bytes = self.getOptionalLimitBytes()
  92. limit_new = round((float(total_downloaded) / 1024 / 1024 / 1024) * 1.1, 2) # Current limit + 10%
  93. self.log.info(
  94. "First startup after update and limit is smaller than downloaded files size (%.2fGB), increasing it from %.2fGB to %.2fGB" %
  95. (float(total_downloaded) / 1024 / 1024 / 1024, float(limit_bytes) / 1024 / 1024 / 1024, limit_new)
  96. )
  97. config.saveValue("optional_limit", limit_new)
  98. config.optional_limit = str(limit_new)
  99. # Predicts if the file is optional
  100. def isOptionalFile(self, site_id, inner_path):
  101. return self.optional_files[site_id].get(inner_path[-8:])
  102. # Fill file_optional table with optional files found in sites
  103. def fillTableFileOptional(self, site):
  104. s = time.time()
  105. site_id = self.site_ids.get(site.address)
  106. if not site_id:
  107. return False
  108. cur = self.getCursor()
  109. res = cur.execute("SELECT * FROM content WHERE size_files_optional > 0 AND site_id = %s" % site_id)
  110. num = 0
  111. for row in res.fetchall():
  112. content = site.content_manager.contents[row["inner_path"]]
  113. try:
  114. num += self.setContentFilesOptional(site, row["inner_path"], content, cur=cur)
  115. except Exception as err:
  116. self.log.error("Error loading %s into file_optional: %s" % (row["inner_path"], err))
  117. cur.close()
  118. # Set my files to pinned
  119. from User import UserManager
  120. user = UserManager.user_manager.get()
  121. if not user:
  122. user = UserManager.user_manager.create()
  123. auth_address = user.getAuthAddress(site.address)
  124. res = self.execute(
  125. "UPDATE file_optional SET is_pinned = 1 WHERE site_id = :site_id AND inner_path LIKE :inner_path",
  126. {"site_id": site_id, "inner_path": "%%/%s/%%" % auth_address}
  127. )
  128. self.log.debug(
  129. "Filled file_optional table for %s in %.3fs (loaded: %s, is_pinned: %s)" %
  130. (site.address, time.time() - s, num, res.rowcount)
  131. )
  132. self.filled[site.address] = True
  133. def setContentFilesOptional(self, site, content_inner_path, content, cur=None):
  134. if not cur:
  135. cur = self
  136. num = 0
  137. site_id = self.site_ids[site.address]
  138. content_inner_dir = helper.getDirname(content_inner_path)
  139. for relative_inner_path, file in content.get("files_optional", {}).items():
  140. file_inner_path = content_inner_dir + relative_inner_path
  141. hash_id = int(file["sha512"][0:4], 16)
  142. if hash_id in site.content_manager.hashfield:
  143. is_downloaded = 1
  144. else:
  145. is_downloaded = 0
  146. if site.address + "/" + content_inner_dir in self.my_optional_files:
  147. is_pinned = 1
  148. else:
  149. is_pinned = 0
  150. cur.insertOrUpdate("file_optional", {
  151. "hash_id": hash_id,
  152. "size": int(file["size"])
  153. }, {
  154. "site_id": site_id,
  155. "inner_path": file_inner_path
  156. }, oninsert={
  157. "time_added": int(time.time()),
  158. "time_downloaded": int(time.time()) if is_downloaded else 0,
  159. "is_downloaded": is_downloaded,
  160. "peer": is_downloaded,
  161. "is_pinned": is_pinned
  162. })
  163. self.optional_files[site_id][file_inner_path[-8:]] = 1
  164. num += 1
  165. return num
  166. def setContent(self, site, inner_path, content, size=0):
  167. super(ContentDbPlugin, self).setContent(site, inner_path, content, size=size)
  168. old_content = site.content_manager.contents.get(inner_path, {})
  169. if (not self.need_filling or self.filled.get(site.address)) and ("files_optional" in content or "files_optional" in old_content):
  170. self.setContentFilesOptional(site, inner_path, content)
  171. # Check deleted files
  172. if old_content:
  173. old_files = old_content.get("files_optional", {}).keys()
  174. new_files = content.get("files_optional", {}).keys()
  175. content_inner_dir = helper.getDirname(inner_path)
  176. deleted = [content_inner_dir + key for key in old_files if key not in new_files]
  177. if deleted:
  178. site_id = self.site_ids[site.address]
  179. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": deleted})
  180. def deleteContent(self, site, inner_path):
  181. content = site.content_manager.contents.get(inner_path)
  182. if content and "files_optional" in content:
  183. site_id = self.site_ids[site.address]
  184. content_inner_dir = helper.getDirname(inner_path)
  185. optional_inner_paths = [
  186. content_inner_dir + relative_inner_path
  187. for relative_inner_path in content.get("files_optional", {}).keys()
  188. ]
  189. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": optional_inner_paths})
  190. super(ContentDbPlugin, self).deleteContent(site, inner_path)
  191. def updatePeerNumbers(self):
  192. s = time.time()
  193. num_file = 0
  194. num_updated = 0
  195. num_site = 0
  196. for site in list(self.sites.values()):
  197. if not site.content_manager.has_optional_files:
  198. continue
  199. if not site.isServing():
  200. continue
  201. has_updated_hashfield = next((
  202. peer
  203. for peer in site.peers.values()
  204. if peer.has_hashfield and peer.hashfield.time_changed > self.time_peer_numbers_updated
  205. ), None)
  206. if not has_updated_hashfield and site.content_manager.hashfield.time_changed < self.time_peer_numbers_updated:
  207. continue
  208. hashfield_peers = itertools.chain.from_iterable(
  209. peer.hashfield.storage
  210. for peer in site.peers.values()
  211. if peer.has_hashfield
  212. )
  213. peer_nums = collections.Counter(
  214. itertools.chain(
  215. hashfield_peers,
  216. site.content_manager.hashfield
  217. )
  218. )
  219. site_id = self.site_ids[site.address]
  220. if not site_id:
  221. continue
  222. res = self.execute("SELECT file_id, hash_id, peer FROM file_optional WHERE ?", {"site_id": site_id})
  223. updates = {}
  224. for row in res:
  225. peer_num = peer_nums.get(row["hash_id"], 0)
  226. if peer_num != row["peer"]:
  227. updates[row["file_id"]] = peer_num
  228. for file_id, peer_num in updates.items():
  229. self.execute("UPDATE file_optional SET peer = ? WHERE file_id = ?", (peer_num, file_id))
  230. num_updated += len(updates)
  231. num_file += len(peer_nums)
  232. num_site += 1
  233. self.time_peer_numbers_updated = time.time()
  234. self.log.debug("%s/%s peer number for %s site updated in %.3fs" % (num_updated, num_file, num_site, time.time() - s))
  235. def queryDeletableFiles(self):
  236. # First return the files with atleast 10 seeder and not accessed in last week
  237. query = """
  238. SELECT * FROM file_optional
  239. WHERE peer > 10 AND %s
  240. ORDER BY time_accessed < %s DESC, uploaded / size
  241. """ % (self.getOptionalUsedWhere(), int(time.time() - 60 * 60 * 7))
  242. limit_start = 0
  243. while 1:
  244. num = 0
  245. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  246. for row in res:
  247. yield row
  248. num += 1
  249. if num < 50:
  250. break
  251. limit_start += 50
  252. self.log.debug("queryDeletableFiles returning less-seeded files")
  253. # Then return files less seeder but still not accessed in last week
  254. query = """
  255. SELECT * FROM file_optional
  256. WHERE peer <= 10 AND %s
  257. ORDER BY peer DESC, time_accessed < %s DESC, uploaded / size
  258. """ % (self.getOptionalUsedWhere(), int(time.time() - 60 * 60 * 7))
  259. limit_start = 0
  260. while 1:
  261. num = 0
  262. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  263. for row in res:
  264. yield row
  265. num += 1
  266. if num < 50:
  267. break
  268. limit_start += 50
  269. self.log.debug("queryDeletableFiles returning everyting")
  270. # At the end return all files
  271. query = """
  272. SELECT * FROM file_optional
  273. WHERE peer <= 10 AND %s
  274. ORDER BY peer DESC, time_accessed, uploaded / size
  275. """ % self.getOptionalUsedWhere()
  276. limit_start = 0
  277. while 1:
  278. num = 0
  279. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  280. for row in res:
  281. yield row
  282. num += 1
  283. if num < 50:
  284. break
  285. limit_start += 50
  286. def getOptionalLimitBytes(self):
  287. if config.optional_limit.endswith("%"):
  288. limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
  289. limit_bytes = helper.getFreeSpace() * (limit_percent / 100)
  290. else:
  291. limit_bytes = float(re.sub("[^0-9.]", "", config.optional_limit)) * 1024 * 1024 * 1024
  292. return limit_bytes
  293. def getOptionalUsedWhere(self):
  294. maxsize = config.optional_limit_exclude_minsize * 1024 * 1024
  295. query = "is_downloaded = 1 AND is_pinned = 0 AND size < %s" % maxsize
  296. # Don't delete optional files from owned sites
  297. my_site_ids = []
  298. for address, site in self.sites.items():
  299. if site.settings["own"]:
  300. my_site_ids.append(str(self.site_ids[address]))
  301. if my_site_ids:
  302. query += " AND site_id NOT IN (%s)" % ", ".join(my_site_ids)
  303. return query
  304. def getOptionalUsedBytes(self):
  305. size = self.execute("SELECT SUM(size) FROM file_optional WHERE %s" % self.getOptionalUsedWhere()).fetchone()[0]
  306. if not size:
  307. size = 0
  308. return size
  309. def getOptionalNeedDelete(self, size):
  310. if config.optional_limit.endswith("%"):
  311. limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
  312. need_delete = size - ((helper.getFreeSpace() + size) * (limit_percent / 100))
  313. else:
  314. need_delete = size - self.getOptionalLimitBytes()
  315. return need_delete
  316. def checkOptionalLimit(self, limit=None):
  317. if not limit:
  318. limit = self.getOptionalLimitBytes()
  319. if limit < 0:
  320. self.log.debug("Invalid limit for optional files: %s" % limit)
  321. return False
  322. size = self.getOptionalUsedBytes()
  323. need_delete = self.getOptionalNeedDelete(size)
  324. self.log.debug(
  325. "Optional size: %.1fMB/%.1fMB, Need delete: %.1fMB" %
  326. (float(size) / 1024 / 1024, float(limit) / 1024 / 1024, float(need_delete) / 1024 / 1024)
  327. )
  328. if need_delete <= 0:
  329. return False
  330. self.updatePeerNumbers()
  331. site_ids_reverse = {val: key for key, val in self.site_ids.items()}
  332. deleted_file_ids = []
  333. for row in self.queryDeletableFiles():
  334. site_address = site_ids_reverse.get(row["site_id"])
  335. site = self.sites.get(site_address)
  336. if not site:
  337. self.log.error("No site found for id: %s" % row["site_id"])
  338. continue
  339. site.log.debug("Deleting %s %.3f MB left" % (row["inner_path"], float(need_delete) / 1024 / 1024))
  340. deleted_file_ids.append(row["file_id"])
  341. try:
  342. site.content_manager.optionalRemoved(row["inner_path"], row["hash_id"], row["size"])
  343. site.storage.delete(row["inner_path"])
  344. need_delete -= row["size"]
  345. except Exception as err:
  346. site.log.error("Error deleting %s: %s" % (row["inner_path"], err))
  347. if need_delete <= 0:
  348. break
  349. cur = self.getCursor()
  350. for file_id in deleted_file_ids:
  351. cur.execute("UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE ?", {"file_id": file_id})
  352. cur.close()
  353. @PluginManager.registerTo("SiteManager")
  354. class SiteManagerPlugin(object):
  355. def load(self, *args, **kwargs):
  356. back = super(SiteManagerPlugin, self).load(*args, **kwargs)
  357. if self.sites and not content_db.optional_files_loaded and content_db.conn:
  358. content_db.optional_files_loaded = True
  359. content_db.loadFilesOptional()
  360. return back