BigfilePlugin.py 35 KB


  1. import time
  2. import os
  3. import subprocess
  4. import shutil
  5. import collections
  6. import math
  7. import warnings
  8. import base64
  9. import binascii
  10. import json
  11. import gevent
  12. import gevent.lock
  13. from Plugin import PluginManager
  14. from Debug import Debug
  15. from Crypt import CryptHash
  16. with warnings.catch_warnings():
  17. warnings.filterwarnings("ignore") # Ignore missing sha3 warning
  18. import merkletools
  19. from util import helper
  20. from util import Msgpack
  21. from util.Flag import flag
  22. import util
  23. from .BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
  24. # We can only import plugin host clases after the plugins are loaded
  25. @PluginManager.afterLoad
  26. def importPluginnedClasses():
  27. global VerifyError, config
  28. from Content.ContentManager import VerifyError
  29. from Config import config
  30. if "upload_nonces" not in locals():
  31. upload_nonces = {}
  32. @PluginManager.registerTo("UiRequest")
  33. class UiRequestPlugin(object):
  34. def isCorsAllowed(self, path):
  35. if path == "/ZeroNet-Internal/BigfileUpload":
  36. return True
  37. else:
  38. return super(UiRequestPlugin, self).isCorsAllowed(path)
  39. @helper.encodeResponse
  40. def actionBigfileUpload(self):
  41. nonce = self.get.get("upload_nonce")
  42. if nonce not in upload_nonces:
  43. return self.error403("Upload nonce error.")
  44. upload_info = upload_nonces[nonce]
  45. del upload_nonces[nonce]
  46. self.sendHeader(200, "text/html", noscript=True, extra_headers={
  47. "Access-Control-Allow-Origin": "null",
  48. "Access-Control-Allow-Credentials": "true"
  49. })
  50. self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
  51. result = self.handleBigfileUpload(upload_info, self.env['wsgi.input'].read)
  52. return json.dumps(result)
  53. def actionBigfileUploadWebsocket(self):
  54. ws = self.env.get("wsgi.websocket")
  55. if not ws:
  56. self.start_response("400 Bad Request", [])
  57. return [b"Not a websocket request!"]
  58. nonce = self.get.get("upload_nonce")
  59. if nonce not in upload_nonces:
  60. return self.error403("Upload nonce error.")
  61. upload_info = upload_nonces[nonce]
  62. del upload_nonces[nonce]
  63. ws.send("poll")
  64. buffer = b""
  65. def read(size):
  66. nonlocal buffer
  67. while len(buffer) < size:
  68. buffer += ws.receive()
  69. ws.send("poll")
  70. part, buffer = buffer[:size], buffer[size:]
  71. return part
  72. result = self.handleBigfileUpload(upload_info, read)
  73. ws.send(json.dumps(result))
  74. def handleBigfileUpload(self, upload_info, read):
  75. site = upload_info["site"]
  76. inner_path = upload_info["inner_path"]
  77. with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
  78. merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
  79. read, upload_info["size"], upload_info["piece_size"], out_file
  80. )
  81. if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
  82. hash = binascii.hexlify(piecemap_info["sha512_pieces"][0])
  83. hash_id = site.content_manager.hashfield.getHashId(hash)
  84. site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True)
  85. else: # Big file
  86. file_name = helper.getFilename(inner_path)
  87. site.storage.open(upload_info["piecemap"], "wb").write(Msgpack.pack({file_name: piecemap_info}))
  88. # Find piecemap and file relative path to content.json
  89. file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
  90. content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
  91. piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
  92. file_relative_path = inner_path[len(content_inner_path_dir):]
  93. # Add file to content.json
  94. if site.storage.isFile(file_info["content_inner_path"]):
  95. content = site.storage.loadJson(file_info["content_inner_path"])
  96. else:
  97. content = {}
  98. if "files_optional" not in content:
  99. content["files_optional"] = {}
  100. content["files_optional"][file_relative_path] = {
  101. "sha512": merkle_root,
  102. "size": upload_info["size"],
  103. "piecemap": piecemap_relative_path,
  104. "piece_size": piece_size
  105. }
  106. merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root)
  107. site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)
  108. site.storage.writeJson(file_info["content_inner_path"], content)
  109. site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache
  110. return {
  111. "merkle_root": merkle_root,
  112. "piece_num": len(piecemap_info["sha512_pieces"]),
  113. "piece_size": piece_size,
  114. "inner_path": inner_path
  115. }
  116. def readMultipartHeaders(self, wsgi_input):
  117. found = False
  118. for i in range(100):
  119. line = wsgi_input.readline()
  120. if line == b"\r\n":
  121. found = True
  122. break
  123. if not found:
  124. raise Exception("No multipart header found")
  125. return i
  126. def actionFile(self, file_path, *args, **kwargs):
  127. if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
  128. path_parts = kwargs["path_parts"]
  129. site = self.server.site_manager.get(path_parts["address"])
  130. big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
  131. if big_file:
  132. kwargs["file_obj"] = big_file
  133. kwargs["file_size"] = big_file.size
  134. return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
  135. @PluginManager.registerTo("UiWebsocket")
  136. class UiWebsocketPlugin(object):
  137. def actionBigfileUploadInit(self, to, inner_path, size, protocol="xhr"):
  138. valid_signers = self.site.content_manager.getValidSigners(inner_path)
  139. auth_address = self.user.getAuthAddress(self.site.address)
  140. if not self.site.settings["own"] and auth_address not in valid_signers:
  141. self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
  142. return self.response(to, {"error": "Forbidden, you can only modify your own files"})
  143. nonce = CryptHash.random()
  144. piece_size = 1024 * 1024
  145. inner_path = self.site.content_manager.sanitizePath(inner_path)
  146. file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
  147. content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
  148. file_relative_path = inner_path[len(content_inner_path_dir):]
  149. upload_nonces[nonce] = {
  150. "added": time.time(),
  151. "site": self.site,
  152. "inner_path": inner_path,
  153. "websocket_client": self,
  154. "size": size,
  155. "piece_size": piece_size,
  156. "piecemap": inner_path + ".piecemap.msgpack"
  157. }
  158. if protocol == "xhr":
  159. return {
  160. "url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
  161. "piece_size": piece_size,
  162. "inner_path": inner_path,
  163. "file_relative_path": file_relative_path
  164. }
  165. elif protocol == "websocket":
  166. server_url = self.request.getWsServerUrl()
  167. if server_url:
  168. proto, host = server_url.split("://")
  169. origin = proto.replace("http", "ws") + "://" + host
  170. else:
  171. origin = "{origin}"
  172. return {
  173. "url": origin + "/ZeroNet-Internal/BigfileUploadWebsocket?upload_nonce=" + nonce,
  174. "piece_size": piece_size,
  175. "inner_path": inner_path,
  176. "file_relative_path": file_relative_path
  177. }
  178. else:
  179. return {"error": "Unknown protocol"}
  180. @flag.no_multiuser
  181. def actionSiteSetAutodownloadBigfileLimit(self, to, limit):
  182. permissions = self.getPermissions(to)
  183. if "ADMIN" not in permissions:
  184. return self.response(to, "You don't have permission to run this command")
  185. self.site.settings["autodownload_bigfile_size_limit"] = int(limit)
  186. self.response(to, "ok")
  187. def actionFileDelete(self, to, inner_path):
  188. piecemap_inner_path = inner_path + ".piecemap.msgpack"
  189. if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path):
  190. # Also delete .piecemap.msgpack file if exists
  191. self.log.debug("Deleting piecemap: %s" % piecemap_inner_path)
  192. file_info = self.site.content_manager.getFileInfo(piecemap_inner_path)
  193. if file_info:
  194. content_json = self.site.storage.loadJson(file_info["content_inner_path"])
  195. relative_path = file_info["relative_path"]
  196. if relative_path in content_json.get("files_optional", {}):
  197. del content_json["files_optional"][relative_path]
  198. self.site.storage.writeJson(file_info["content_inner_path"], content_json)
  199. self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
  200. try:
  201. self.site.storage.delete(piecemap_inner_path)
  202. except Exception as err:
  203. self.log.error("File %s delete error: %s" % (piecemap_inner_path, err))
  204. return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path)
  205. @PluginManager.registerTo("ContentManager")
  206. class ContentManagerPlugin(object):
  207. def getFileInfo(self, inner_path, *args, **kwargs):
  208. if "|" not in inner_path:
  209. return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
  210. inner_path, file_range = inner_path.split("|")
  211. pos_from, pos_to = map(int, file_range.split("-"))
  212. file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
  213. return file_info
  214. def readFile(self, read_func, size, buff_size=1024 * 64):
  215. part_num = 0
  216. recv_left = size
  217. while 1:
  218. part_num += 1
  219. read_size = min(buff_size, recv_left)
  220. part = read_func(read_size)
  221. if not part:
  222. break
  223. yield part
  224. if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
  225. time.sleep(0.001)
  226. recv_left -= read_size
  227. if recv_left <= 0:
  228. break
  229. def hashBigfile(self, read_func, size, piece_size=1024 * 1024, file_out=None):
  230. self.site.settings["has_bigfile"] = True
  231. recv = 0
  232. try:
  233. piece_hash = CryptHash.sha512t()
  234. piece_hashes = []
  235. piece_recv = 0
  236. mt = merkletools.MerkleTools()
  237. mt.hash_function = CryptHash.sha512t
  238. part = ""
  239. for part in self.readFile(read_func, size):
  240. if file_out:
  241. file_out.write(part)
  242. recv += len(part)
  243. piece_recv += len(part)
  244. piece_hash.update(part)
  245. if piece_recv >= piece_size:
  246. piece_digest = piece_hash.digest()
  247. piece_hashes.append(piece_digest)
  248. mt.leaves.append(piece_digest)
  249. piece_hash = CryptHash.sha512t()
  250. piece_recv = 0
  251. if len(piece_hashes) % 100 == 0 or recv == size:
  252. self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
  253. float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
  254. ))
  255. part = ""
  256. if len(part) > 0:
  257. piece_digest = piece_hash.digest()
  258. piece_hashes.append(piece_digest)
  259. mt.leaves.append(piece_digest)
  260. except Exception as err:
  261. raise err
  262. finally:
  263. if file_out:
  264. file_out.close()
  265. mt.make_tree()
  266. merkle_root = mt.get_merkle_root()
  267. if type(merkle_root) is bytes: # Python <3.5
  268. merkle_root = merkle_root.decode()
  269. return merkle_root, piece_size, {
  270. "sha512_pieces": piece_hashes
  271. }
  272. def hashFile(self, dir_inner_path, file_relative_path, optional=False):
  273. inner_path = dir_inner_path + file_relative_path
  274. file_size = self.site.storage.getSize(inner_path)
  275. # Only care about optional files >1MB
  276. if not optional or file_size < 1 * 1024 * 1024:
  277. return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
  278. back = {}
  279. content = self.contents.get(dir_inner_path + "content.json")
  280. hash = None
  281. piecemap_relative_path = None
  282. piece_size = None
  283. # Don't re-hash if it's already in content.json
  284. if content and file_relative_path in content.get("files_optional", {}):
  285. file_node = content["files_optional"][file_relative_path]
  286. if file_node["size"] == file_size:
  287. self.log.info("- [SAME SIZE] %s" % file_relative_path)
  288. hash = file_node.get("sha512")
  289. piecemap_relative_path = file_node.get("piecemap")
  290. piece_size = file_node.get("piece_size")
  291. if not hash or not piecemap_relative_path: # Not in content.json yet
  292. if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
  293. return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
  294. self.log.info("- [HASHING] %s" % file_relative_path)
  295. merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb").read, file_size)
  296. if not hash:
  297. hash = merkle_root
  298. if not piecemap_relative_path:
  299. file_name = helper.getFilename(file_relative_path)
  300. piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
  301. piecemap_inner_path = inner_path + ".piecemap.msgpack"
  302. self.site.storage.open(piecemap_inner_path, "wb").write(Msgpack.pack({file_name: piecemap_info}))
  303. back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True))
  304. piece_num = int(math.ceil(float(file_size) / piece_size))
  305. # Add the merkle root to hashfield
  306. hash_id = self.site.content_manager.hashfield.getHashId(hash)
  307. self.optionalDownloaded(inner_path, hash_id, file_size, own=True)
  308. self.site.storage.piecefields[hash].frombytes(b"\x01" * piece_num)
  309. back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
  310. return back
  311. def getPiecemap(self, inner_path):
  312. file_info = self.site.content_manager.getFileInfo(inner_path)
  313. piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
  314. self.site.needFile(piecemap_inner_path, priority=20)
  315. piecemap = Msgpack.unpack(self.site.storage.open(piecemap_inner_path, "rb").read())[helper.getFilename(inner_path)]
  316. piecemap["piece_size"] = file_info["piece_size"]
  317. return piecemap
  318. def verifyPiece(self, inner_path, pos, piece):
  319. try:
  320. piecemap = self.getPiecemap(inner_path)
  321. except Exception as err:
  322. raise VerifyError("Unable to download piecemap: %s" % Debug.formatException(err))
  323. piece_i = int(pos / piecemap["piece_size"])
  324. if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
  325. raise VerifyError("Invalid hash")
  326. return True
  327. def verifyFile(self, inner_path, file, ignore_same=True):
  328. if "|" not in inner_path:
  329. return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
  330. inner_path, file_range = inner_path.split("|")
  331. pos_from, pos_to = map(int, file_range.split("-"))
  332. return self.verifyPiece(inner_path, pos_from, file)
  333. def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
  334. if "|" in inner_path:
  335. inner_path, file_range = inner_path.split("|")
  336. pos_from, pos_to = map(int, file_range.split("-"))
  337. file_info = self.getFileInfo(inner_path)
  338. # Mark piece downloaded
  339. piece_i = int(pos_from / file_info["piece_size"])
  340. self.site.storage.piecefields[file_info["sha512"]][piece_i] = b"\x01"
  341. # Only add to site size on first request
  342. if hash_id in self.hashfield:
  343. size = 0
  344. elif size > 1024 * 1024:
  345. file_info = self.getFileInfo(inner_path)
  346. if file_info and "sha512" in file_info: # We already have the file, but not in piecefield
  347. sha512 = file_info["sha512"]
  348. if sha512 not in self.site.storage.piecefields:
  349. self.site.storage.checkBigfile(inner_path)
  350. return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
  351. def optionalRemoved(self, inner_path, hash_id, size=None):
  352. if size and size > 1024 * 1024:
  353. file_info = self.getFileInfo(inner_path)
  354. sha512 = file_info["sha512"]
  355. if sha512 in self.site.storage.piecefields:
  356. del self.site.storage.piecefields[sha512]
  357. # Also remove other pieces of the file from download queue
  358. for key in list(self.site.bad_files.keys()):
  359. if key.startswith(inner_path + "|"):
  360. del self.site.bad_files[key]
  361. self.site.worker_manager.removeSolvedFileTasks()
  362. return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
  363. @PluginManager.registerTo("SiteStorage")
  364. class SiteStoragePlugin(object):
  365. def __init__(self, *args, **kwargs):
  366. super(SiteStoragePlugin, self).__init__(*args, **kwargs)
  367. self.piecefields = collections.defaultdict(BigfilePiecefield)
  368. if "piecefields" in self.site.settings.get("cache", {}):
  369. for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").items():
  370. if piecefield_packed:
  371. self.piecefields[sha512].unpack(base64.b64decode(piecefield_packed))
  372. self.site.settings["cache"]["piecefields"] = {}
  373. def createSparseFile(self, inner_path, size, sha512=None):
  374. file_path = self.getPath(inner_path)
  375. self.ensureDir(os.path.dirname(inner_path))
  376. f = open(file_path, 'wb')
  377. f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB
  378. f.close()
  379. if os.name == "nt":
  380. startupinfo = subprocess.STARTUPINFO()
  381. startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
  382. subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
  383. if sha512 and sha512 in self.piecefields:
  384. self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
  385. del self.piecefields[sha512]
  386. def write(self, inner_path, content):
  387. if "|" not in inner_path:
  388. return super(SiteStoragePlugin, self).write(inner_path, content)
  389. # Write to specific position by passing |{pos} after the filename
  390. inner_path, file_range = inner_path.split("|")
  391. pos_from, pos_to = map(int, file_range.split("-"))
  392. file_path = self.getPath(inner_path)
  393. # Create dir if not exist
  394. self.ensureDir(os.path.dirname(inner_path))
  395. if not os.path.isfile(file_path):
  396. file_info = self.site.content_manager.getFileInfo(inner_path)
  397. self.createSparseFile(inner_path, file_info["size"])
  398. # Write file
  399. with open(file_path, "rb+") as file:
  400. file.seek(pos_from)
  401. if hasattr(content, 'read'): # File-like object
  402. shutil.copyfileobj(content, file) # Write buff to disk
  403. else: # Simple string
  404. file.write(content)
  405. del content
  406. self.onUpdated(inner_path)
  407. def checkBigfile(self, inner_path):
  408. file_info = self.site.content_manager.getFileInfo(inner_path)
  409. if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file
  410. return False
  411. self.site.settings["has_bigfile"] = True
  412. file_path = self.getPath(inner_path)
  413. sha512 = file_info["sha512"]
  414. piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
  415. if os.path.isfile(file_path):
  416. if sha512 not in self.piecefields:
  417. if open(file_path, "rb").read(128) == b"\0" * 128:
  418. piece_data = b"\x00"
  419. else:
  420. piece_data = b"\x01"
  421. self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
  422. self.piecefields[sha512].frombytes(piece_data * piece_num)
  423. else:
  424. self.log.debug("Creating bigfile: %s" % inner_path)
  425. self.createSparseFile(inner_path, file_info["size"], sha512)
  426. self.piecefields[sha512].frombytes(b"\x00" * piece_num)
  427. self.log.debug("Created bigfile: %s" % inner_path)
  428. return True
  429. def openBigfile(self, inner_path, prebuffer=0):
  430. if not self.checkBigfile(inner_path):
  431. return False
  432. self.site.needFile(inner_path, blocking=False) # Download piecemap
  433. return BigFile(self.site, inner_path, prebuffer=prebuffer)
  434. class BigFile(object):
  435. def __init__(self, site, inner_path, prebuffer=0):
  436. self.site = site
  437. self.inner_path = inner_path
  438. file_path = site.storage.getPath(inner_path)
  439. file_info = self.site.content_manager.getFileInfo(inner_path)
  440. self.piece_size = file_info["piece_size"]
  441. self.sha512 = file_info["sha512"]
  442. self.size = file_info["size"]
  443. self.prebuffer = prebuffer
  444. self.read_bytes = 0
  445. self.piecefield = self.site.storage.piecefields[self.sha512]
  446. self.f = open(file_path, "rb+")
  447. self.read_lock = gevent.lock.Semaphore()
  448. def read(self, buff=64 * 1024):
  449. with self.read_lock:
  450. pos = self.f.tell()
  451. read_until = min(self.size, pos + buff)
  452. requests = []
  453. # Request all required blocks
  454. while 1:
  455. piece_i = int(pos / self.piece_size)
  456. if piece_i * self.piece_size >= read_until:
  457. break
  458. pos_from = piece_i * self.piece_size
  459. pos_to = pos_from + self.piece_size
  460. if not self.piecefield[piece_i]:
  461. requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
  462. pos += self.piece_size
  463. if not all(requests):
  464. return None
  465. # Request prebuffer
  466. if self.prebuffer:
  467. prebuffer_until = min(self.size, read_until + self.prebuffer)
  468. priority = 3
  469. while 1:
  470. piece_i = int(pos / self.piece_size)
  471. if piece_i * self.piece_size >= prebuffer_until:
  472. break
  473. pos_from = piece_i * self.piece_size
  474. pos_to = pos_from + self.piece_size
  475. if not self.piecefield[piece_i]:
  476. self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
  477. priority -= 1
  478. pos += self.piece_size
  479. gevent.joinall(requests)
  480. self.read_bytes += buff
  481. # Increase buffer for long reads
  482. if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
  483. self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
  484. self.prebuffer = 5 * 1024 * 1024
  485. return self.f.read(buff)
  486. def seek(self, pos, whence=0):
  487. with self.read_lock:
  488. if whence == 2: # Relative from file end
  489. pos = self.size + pos # Use the real size instead of size on the disk
  490. whence = 0
  491. return self.f.seek(pos, whence)
  492. def seekable(self):
  493. return self.f.seekable()
  494. def tell(self):
  495. return self.f.tell()
  496. def close(self):
  497. self.f.close()
  498. def __enter__(self):
  499. return self
  500. def __exit__(self, exc_type, exc_val, exc_tb):
  501. self.close()
  502. @PluginManager.registerTo("WorkerManager")
  503. class WorkerManagerPlugin(object):
  504. def addTask(self, inner_path, *args, **kwargs):
  505. file_info = kwargs.get("file_info")
  506. if file_info and "piecemap" in file_info: # Bigfile
  507. self.site.settings["has_bigfile"] = True
  508. piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
  509. piecemap_task = None
  510. if not self.site.storage.isFile(piecemap_inner_path):
  511. # Start download piecemap
  512. piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
  513. autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit)
  514. if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit:
  515. gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
  516. if "|" in inner_path:
  517. # Start download piece
  518. task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
  519. inner_path, file_range = inner_path.split("|")
  520. pos_from, pos_to = map(int, file_range.split("-"))
  521. task["piece_i"] = int(pos_from / file_info["piece_size"])
  522. task["sha512"] = file_info["sha512"]
  523. else:
  524. if inner_path in self.site.bad_files:
  525. del self.site.bad_files[inner_path]
  526. if piecemap_task:
  527. task = piecemap_task
  528. else:
  529. fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
  530. fake_evt.set(True)
  531. task = {"evt": fake_evt}
  532. if not self.site.storage.isFile(inner_path):
  533. self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
  534. piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
  535. self.site.storage.piecefields[file_info["sha512"]].frombytes(b"\x00" * piece_num)
  536. else:
  537. task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
  538. return task
  539. def taskAddPeer(self, task, peer):
  540. if "piece_i" in task:
  541. if not peer.piecefields[task["sha512"]][task["piece_i"]]:
  542. if task["sha512"] not in peer.piecefields:
  543. gevent.spawn(peer.updatePiecefields, force=True)
  544. elif not task["peers"]:
  545. gevent.spawn(peer.updatePiecefields)
  546. return False # Deny to add peers to task if file not in piecefield
  547. return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
  548. @PluginManager.registerTo("FileRequest")
  549. class FileRequestPlugin(object):
  550. def isReadable(self, site, inner_path, file, pos):
  551. # Peek into file
  552. if file.read(10) == b"\0" * 10:
  553. # Looks empty, but makes sures we don't have that piece
  554. file_info = site.content_manager.getFileInfo(inner_path)
  555. if "piece_size" in file_info:
  556. piece_i = int(pos / file_info["piece_size"])
  557. if not site.storage.piecefields[file_info["sha512"]][piece_i]:
  558. return False
  559. # Seek back to position we want to read
  560. file.seek(pos)
  561. return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
  562. def actionGetPiecefields(self, params):
  563. site = self.sites.get(params["site"])
  564. if not site or not site.isServing(): # Site unknown or not serving
  565. self.response({"error": "Unknown site"})
  566. return False
  567. # Add peer to site if not added before
  568. peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
  569. if not peer.connection: # Just added
  570. peer.connect(self.connection) # Assign current connection to peer
  571. piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.items()}
  572. self.response({"piecefields_packed": piecefields_packed})
  573. def actionSetPiecefields(self, params):
  574. site = self.sites.get(params["site"])
  575. if not site or not site.isServing(): # Site unknown or not serving
  576. self.response({"error": "Unknown site"})
  577. self.connection.badAction(5)
  578. return False
  579. # Add or get peer
  580. peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
  581. if not peer.connection:
  582. peer.connect(self.connection)
  583. peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  584. for sha512, piecefield_packed in params["piecefields_packed"].items():
  585. peer.piecefields[sha512].unpack(piecefield_packed)
  586. site.settings["has_bigfile"] = True
  587. self.response({"ok": "Updated"})
  588. @PluginManager.registerTo("Peer")
  589. class PeerPlugin(object):
  590. def __getattr__(self, key):
  591. if key == "piecefields":
  592. self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  593. return self.piecefields
  594. elif key == "time_piecefields_updated":
  595. self.time_piecefields_updated = None
  596. return self.time_piecefields_updated
  597. else:
  598. return super(PeerPlugin, self).__getattr__(key)
  599. @util.Noparallel(ignore_args=True)
  600. def updatePiecefields(self, force=False):
  601. if self.connection and self.connection.handshake.get("rev", 0) < 2190:
  602. return False # Not supported
  603. # Don't update piecefield again in 1 min
  604. if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
  605. return False
  606. self.time_piecefields_updated = time.time()
  607. res = self.request("getPiecefields", {"site": self.site.address})
  608. if not res or "error" in res:
  609. return False
  610. self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  611. try:
  612. for sha512, piecefield_packed in res["piecefields_packed"].items():
  613. self.piecefields[sha512].unpack(piecefield_packed)
  614. except Exception as err:
  615. self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
  616. return self.piecefields
  617. def sendMyHashfield(self, *args, **kwargs):
  618. return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
  619. def updateHashfield(self, *args, **kwargs):
  620. if self.site.settings.get("has_bigfile"):
  621. thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
  622. back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
  623. thread.join()
  624. return back
  625. else:
  626. return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
  627. def getFile(self, site, inner_path, *args, **kwargs):
  628. if "|" in inner_path:
  629. inner_path, file_range = inner_path.split("|")
  630. pos_from, pos_to = map(int, file_range.split("-"))
  631. kwargs["pos_from"] = pos_from
  632. kwargs["pos_to"] = pos_to
  633. return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
  634. @PluginManager.registerTo("Site")
  635. class SitePlugin(object):
  636. def isFileDownloadAllowed(self, inner_path, file_info):
  637. if "piecemap" in file_info:
  638. file_size_mb = file_info["size"] / 1024 / 1024
  639. if config.bigfile_size_limit and file_size_mb > config.bigfile_size_limit:
  640. self.log.debug(
  641. "Bigfile size %s too large: %sMB > %sMB, skipping..." %
  642. (inner_path, file_size_mb, config.bigfile_size_limit)
  643. )
  644. return False
  645. file_info = file_info.copy()
  646. file_info["size"] = file_info["piece_size"]
  647. return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
  648. def getSettingsCache(self):
  649. back = super(SitePlugin, self).getSettingsCache()
  650. if self.storage.piecefields:
  651. back["piecefields"] = {sha512: base64.b64encode(piecefield.pack()).decode("utf8") for sha512, piecefield in self.storage.piecefields.items()}
  652. return back
  653. def needFile(self, inner_path, *args, **kwargs):
  654. if inner_path.endswith("|all"):
  655. @util.Pooled(20)
  656. def pooledNeedBigfile(inner_path, *args, **kwargs):
  657. if inner_path not in self.bad_files:
  658. self.log.debug("Cancelled piece, skipping %s" % inner_path)
  659. return False
  660. return self.needFile(inner_path, *args, **kwargs)
  661. inner_path = inner_path.replace("|all", "")
  662. file_info = self.needFileInfo(inner_path)
  663. # Use default function to download non-optional file
  664. if "piece_size" not in file_info:
  665. return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
  666. file_size = file_info["size"]
  667. piece_size = file_info["piece_size"]
  668. piece_num = int(math.ceil(float(file_size) / piece_size))
  669. file_threads = []
  670. piecefield = self.storage.piecefields.get(file_info["sha512"])
  671. for piece_i in range(piece_num):
  672. piece_from = piece_i * piece_size
  673. piece_to = min(file_size, piece_from + piece_size)
  674. if not piecefield or not piecefield[piece_i]:
  675. inner_path_piece = "%s|%s-%s" % (inner_path, piece_from, piece_to)
  676. self.bad_files[inner_path_piece] = self.bad_files.get(inner_path_piece, 1)
  677. res = pooledNeedBigfile(inner_path_piece, blocking=False)
  678. if res is not True and res is not False:
  679. file_threads.append(res)
  680. gevent.joinall(file_threads)
  681. else:
  682. return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
  683. @PluginManager.registerTo("ConfigPlugin")
  684. class ConfigPlugin(object):
  685. def createArguments(self):
  686. group = self.parser.add_argument_group("Bigfile plugin")
  687. group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=10, metavar="MB", type=int)
  688. group.add_argument('--bigfile_size_limit', help='Maximum size of downloaded big files', default=False, metavar="MB", type=int)
  689. return super(ConfigPlugin, self).createArguments()