123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844 |
- import time
- import os
- import subprocess
- import shutil
- import collections
- import math
- import warnings
- import base64
- import binascii
- import json
- import gevent
- import gevent.lock
- from Plugin import PluginManager
- from Debug import Debug
- from Crypt import CryptHash
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore") # Ignore missing sha3 warning
- import merkletools
- from util import helper
- from util import Msgpack
- from util.Flag import flag
- import util
- from .BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
- # We can only import plugin host clases after the plugins are loaded
- @PluginManager.afterLoad
- def importPluginnedClasses():
- global VerifyError, config
- from Content.ContentManager import VerifyError
- from Config import config
- if "upload_nonces" not in locals():
- upload_nonces = {}
- @PluginManager.registerTo("UiRequest")
- class UiRequestPlugin(object):
- def isCorsAllowed(self, path):
- if path == "/ZeroNet-Internal/BigfileUpload":
- return True
- else:
- return super(UiRequestPlugin, self).isCorsAllowed(path)
- @helper.encodeResponse
- def actionBigfileUpload(self):
- nonce = self.get.get("upload_nonce")
- if nonce not in upload_nonces:
- return self.error403("Upload nonce error.")
- upload_info = upload_nonces[nonce]
- del upload_nonces[nonce]
- self.sendHeader(200, "text/html", noscript=True, extra_headers={
- "Access-Control-Allow-Origin": "null",
- "Access-Control-Allow-Credentials": "true"
- })
- self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
- result = self.handleBigfileUpload(upload_info, self.env['wsgi.input'].read)
- return json.dumps(result)
- def actionBigfileUploadWebsocket(self):
- ws = self.env.get("wsgi.websocket")
- if not ws:
- self.start_response("400 Bad Request", [])
- return [b"Not a websocket request!"]
- nonce = self.get.get("upload_nonce")
- if nonce not in upload_nonces:
- return self.error403("Upload nonce error.")
- upload_info = upload_nonces[nonce]
- del upload_nonces[nonce]
- ws.send("poll")
- buffer = b""
- def read(size):
- nonlocal buffer
- while len(buffer) < size:
- buffer += ws.receive()
- ws.send("poll")
- part, buffer = buffer[:size], buffer[size:]
- return part
- result = self.handleBigfileUpload(upload_info, read)
- ws.send(json.dumps(result))
- def handleBigfileUpload(self, upload_info, read):
- site = upload_info["site"]
- inner_path = upload_info["inner_path"]
- with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
- merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
- read, upload_info["size"], upload_info["piece_size"], out_file
- )
- if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
- hash = binascii.hexlify(piecemap_info["sha512_pieces"][0])
- hash_id = site.content_manager.hashfield.getHashId(hash)
- site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True)
- else: # Big file
- file_name = helper.getFilename(inner_path)
- site.storage.open(upload_info["piecemap"], "wb").write(Msgpack.pack({file_name: piecemap_info}))
- # Find piecemap and file relative path to content.json
- file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
- content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
- piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
- file_relative_path = inner_path[len(content_inner_path_dir):]
- # Add file to content.json
- if site.storage.isFile(file_info["content_inner_path"]):
- content = site.storage.loadJson(file_info["content_inner_path"])
- else:
- content = {}
- if "files_optional" not in content:
- content["files_optional"] = {}
- content["files_optional"][file_relative_path] = {
- "sha512": merkle_root,
- "size": upload_info["size"],
- "piecemap": piecemap_relative_path,
- "piece_size": piece_size
- }
- merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root)
- site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)
- site.storage.writeJson(file_info["content_inner_path"], content)
- site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache
- return {
- "merkle_root": merkle_root,
- "piece_num": len(piecemap_info["sha512_pieces"]),
- "piece_size": piece_size,
- "inner_path": inner_path
- }
- def readMultipartHeaders(self, wsgi_input):
- found = False
- for i in range(100):
- line = wsgi_input.readline()
- if line == b"\r\n":
- found = True
- break
- if not found:
- raise Exception("No multipart header found")
- return i
- def actionFile(self, file_path, *args, **kwargs):
- if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
- path_parts = kwargs["path_parts"]
- site = self.server.site_manager.get(path_parts["address"])
- big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
- if big_file:
- kwargs["file_obj"] = big_file
- kwargs["file_size"] = big_file.size
- return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
- @PluginManager.registerTo("UiWebsocket")
- class UiWebsocketPlugin(object):
- def actionBigfileUploadInit(self, to, inner_path, size, protocol="xhr"):
- valid_signers = self.site.content_manager.getValidSigners(inner_path)
- auth_address = self.user.getAuthAddress(self.site.address)
- if not self.site.settings["own"] and auth_address not in valid_signers:
- self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
- return self.response(to, {"error": "Forbidden, you can only modify your own files"})
- nonce = CryptHash.random()
- piece_size = 1024 * 1024
- inner_path = self.site.content_manager.sanitizePath(inner_path)
- file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
- content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
- file_relative_path = inner_path[len(content_inner_path_dir):]
- upload_nonces[nonce] = {
- "added": time.time(),
- "site": self.site,
- "inner_path": inner_path,
- "websocket_client": self,
- "size": size,
- "piece_size": piece_size,
- "piecemap": inner_path + ".piecemap.msgpack"
- }
- if protocol == "xhr":
- return {
- "url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
- "piece_size": piece_size,
- "inner_path": inner_path,
- "file_relative_path": file_relative_path
- }
- elif protocol == "websocket":
- server_url = self.request.getWsServerUrl()
- if server_url:
- proto, host = server_url.split("://")
- origin = proto.replace("http", "ws") + "://" + host
- else:
- origin = "{origin}"
- return {
- "url": origin + "/ZeroNet-Internal/BigfileUploadWebsocket?upload_nonce=" + nonce,
- "piece_size": piece_size,
- "inner_path": inner_path,
- "file_relative_path": file_relative_path
- }
- else:
- return {"error": "Unknown protocol"}
- @flag.no_multiuser
- def actionSiteSetAutodownloadBigfileLimit(self, to, limit):
- permissions = self.getPermissions(to)
- if "ADMIN" not in permissions:
- return self.response(to, "You don't have permission to run this command")
- self.site.settings["autodownload_bigfile_size_limit"] = int(limit)
- self.response(to, "ok")
- def actionFileDelete(self, to, inner_path):
- piecemap_inner_path = inner_path + ".piecemap.msgpack"
- if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path):
- # Also delete .piecemap.msgpack file if exists
- self.log.debug("Deleting piecemap: %s" % piecemap_inner_path)
- file_info = self.site.content_manager.getFileInfo(piecemap_inner_path)
- if file_info:
- content_json = self.site.storage.loadJson(file_info["content_inner_path"])
- relative_path = file_info["relative_path"]
- if relative_path in content_json.get("files_optional", {}):
- del content_json["files_optional"][relative_path]
- self.site.storage.writeJson(file_info["content_inner_path"], content_json)
- self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
- try:
- self.site.storage.delete(piecemap_inner_path)
- except Exception as err:
- self.log.error("File %s delete error: %s" % (piecemap_inner_path, err))
- return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path)
- @PluginManager.registerTo("ContentManager")
- class ContentManagerPlugin(object):
- def getFileInfo(self, inner_path, *args, **kwargs):
- if "|" not in inner_path:
- return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
- inner_path, file_range = inner_path.split("|")
- pos_from, pos_to = map(int, file_range.split("-"))
- file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
- return file_info
- def readFile(self, read_func, size, buff_size=1024 * 64):
- part_num = 0
- recv_left = size
- while 1:
- part_num += 1
- read_size = min(buff_size, recv_left)
- part = read_func(read_size)
- if not part:
- break
- yield part
- if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
- time.sleep(0.001)
- recv_left -= read_size
- if recv_left <= 0:
- break
- def hashBigfile(self, read_func, size, piece_size=1024 * 1024, file_out=None):
- self.site.settings["has_bigfile"] = True
- recv = 0
- try:
- piece_hash = CryptHash.sha512t()
- piece_hashes = []
- piece_recv = 0
- mt = merkletools.MerkleTools()
- mt.hash_function = CryptHash.sha512t
- part = ""
- for part in self.readFile(read_func, size):
- if file_out:
- file_out.write(part)
- recv += len(part)
- piece_recv += len(part)
- piece_hash.update(part)
- if piece_recv >= piece_size:
- piece_digest = piece_hash.digest()
- piece_hashes.append(piece_digest)
- mt.leaves.append(piece_digest)
- piece_hash = CryptHash.sha512t()
- piece_recv = 0
- if len(piece_hashes) % 100 == 0 or recv == size:
- self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
- float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
- ))
- part = ""
- if len(part) > 0:
- piece_digest = piece_hash.digest()
- piece_hashes.append(piece_digest)
- mt.leaves.append(piece_digest)
- except Exception as err:
- raise err
- finally:
- if file_out:
- file_out.close()
- mt.make_tree()
- merkle_root = mt.get_merkle_root()
- if type(merkle_root) is bytes: # Python <3.5
- merkle_root = merkle_root.decode()
- return merkle_root, piece_size, {
- "sha512_pieces": piece_hashes
- }
- def hashFile(self, dir_inner_path, file_relative_path, optional=False):
- inner_path = dir_inner_path + file_relative_path
- file_size = self.site.storage.getSize(inner_path)
- # Only care about optional files >1MB
- if not optional or file_size < 1 * 1024 * 1024:
- return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
- back = {}
- content = self.contents.get(dir_inner_path + "content.json")
- hash = None
- piecemap_relative_path = None
- piece_size = None
- # Don't re-hash if it's already in content.json
- if content and file_relative_path in content.get("files_optional", {}):
- file_node = content["files_optional"][file_relative_path]
- if file_node["size"] == file_size:
- self.log.info("- [SAME SIZE] %s" % file_relative_path)
- hash = file_node.get("sha512")
- piecemap_relative_path = file_node.get("piecemap")
- piece_size = file_node.get("piece_size")
- if not hash or not piecemap_relative_path: # Not in content.json yet
- if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
- return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
- self.log.info("- [HASHING] %s" % file_relative_path)
- merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb").read, file_size)
- if not hash:
- hash = merkle_root
- if not piecemap_relative_path:
- file_name = helper.getFilename(file_relative_path)
- piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
- piecemap_inner_path = inner_path + ".piecemap.msgpack"
- self.site.storage.open(piecemap_inner_path, "wb").write(Msgpack.pack({file_name: piecemap_info}))
- back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True))
- piece_num = int(math.ceil(float(file_size) / piece_size))
- # Add the merkle root to hashfield
- hash_id = self.site.content_manager.hashfield.getHashId(hash)
- self.optionalDownloaded(inner_path, hash_id, file_size, own=True)
- self.site.storage.piecefields[hash].frombytes(b"\x01" * piece_num)
- back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
- return back
- def getPiecemap(self, inner_path):
- file_info = self.site.content_manager.getFileInfo(inner_path)
- piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
- self.site.needFile(piecemap_inner_path, priority=20)
- piecemap = Msgpack.unpack(self.site.storage.open(piecemap_inner_path, "rb").read())[helper.getFilename(inner_path)]
- piecemap["piece_size"] = file_info["piece_size"]
- return piecemap
- def verifyPiece(self, inner_path, pos, piece):
- try:
- piecemap = self.getPiecemap(inner_path)
- except Exception as err:
- raise VerifyError("Unable to download piecemap: %s" % Debug.formatException(err))
- piece_i = int(pos / piecemap["piece_size"])
- if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
- raise VerifyError("Invalid hash")
- return True
- def verifyFile(self, inner_path, file, ignore_same=True):
- if "|" not in inner_path:
- return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
- inner_path, file_range = inner_path.split("|")
- pos_from, pos_to = map(int, file_range.split("-"))
- return self.verifyPiece(inner_path, pos_from, file)
- def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
- if "|" in inner_path:
- inner_path, file_range = inner_path.split("|")
- pos_from, pos_to = map(int, file_range.split("-"))
- file_info = self.getFileInfo(inner_path)
- # Mark piece downloaded
- piece_i = int(pos_from / file_info["piece_size"])
- self.site.storage.piecefields[file_info["sha512"]][piece_i] = b"\x01"
- # Only add to site size on first request
- if hash_id in self.hashfield:
- size = 0
- elif size > 1024 * 1024:
- file_info = self.getFileInfo(inner_path)
- if file_info and "sha512" in file_info: # We already have the file, but not in piecefield
- sha512 = file_info["sha512"]
- if sha512 not in self.site.storage.piecefields:
- self.site.storage.checkBigfile(inner_path)
- return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
- def optionalRemoved(self, inner_path, hash_id, size=None):
- if size and size > 1024 * 1024:
- file_info = self.getFileInfo(inner_path)
- sha512 = file_info["sha512"]
- if sha512 in self.site.storage.piecefields:
- del self.site.storage.piecefields[sha512]
- # Also remove other pieces of the file from download queue
- for key in list(self.site.bad_files.keys()):
- if key.startswith(inner_path + "|"):
- del self.site.bad_files[key]
- self.site.worker_manager.removeSolvedFileTasks()
- return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
- @PluginManager.registerTo("SiteStorage")
- class SiteStoragePlugin(object):
- def __init__(self, *args, **kwargs):
- super(SiteStoragePlugin, self).__init__(*args, **kwargs)
- self.piecefields = collections.defaultdict(BigfilePiecefield)
- if "piecefields" in self.site.settings.get("cache", {}):
- for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").items():
- if piecefield_packed:
- self.piecefields[sha512].unpack(base64.b64decode(piecefield_packed))
- self.site.settings["cache"]["piecefields"] = {}
- def createSparseFile(self, inner_path, size, sha512=None):
- file_path = self.getPath(inner_path)
- self.ensureDir(os.path.dirname(inner_path))
- f = open(file_path, 'wb')
- f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB
- f.close()
- if os.name == "nt":
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
- subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
- if sha512 and sha512 in self.piecefields:
- self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
- del self.piecefields[sha512]
- def write(self, inner_path, content):
- if "|" not in inner_path:
- return super(SiteStoragePlugin, self).write(inner_path, content)
- # Write to specific position by passing |{pos} after the filename
- inner_path, file_range = inner_path.split("|")
- pos_from, pos_to = map(int, file_range.split("-"))
- file_path = self.getPath(inner_path)
- # Create dir if not exist
- self.ensureDir(os.path.dirname(inner_path))
- if not os.path.isfile(file_path):
- file_info = self.site.content_manager.getFileInfo(inner_path)
- self.createSparseFile(inner_path, file_info["size"])
- # Write file
- with open(file_path, "rb+") as file:
- file.seek(pos_from)
- if hasattr(content, 'read'): # File-like object
- shutil.copyfileobj(content, file) # Write buff to disk
- else: # Simple string
- file.write(content)
- del content
- self.onUpdated(inner_path)
- def checkBigfile(self, inner_path):
- file_info = self.site.content_manager.getFileInfo(inner_path)
- if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file
- return False
- self.site.settings["has_bigfile"] = True
- file_path = self.getPath(inner_path)
- sha512 = file_info["sha512"]
- piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
- if os.path.isfile(file_path):
- if sha512 not in self.piecefields:
- if open(file_path, "rb").read(128) == b"\0" * 128:
- piece_data = b"\x00"
- else:
- piece_data = b"\x01"
- self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
- self.piecefields[sha512].frombytes(piece_data * piece_num)
- else:
- self.log.debug("Creating bigfile: %s" % inner_path)
- self.createSparseFile(inner_path, file_info["size"], sha512)
- self.piecefields[sha512].frombytes(b"\x00" * piece_num)
- self.log.debug("Created bigfile: %s" % inner_path)
- return True
- def openBigfile(self, inner_path, prebuffer=0):
- if not self.checkBigfile(inner_path):
- return False
- self.site.needFile(inner_path, blocking=False) # Download piecemap
- return BigFile(self.site, inner_path, prebuffer=prebuffer)
- class BigFile(object):
- def __init__(self, site, inner_path, prebuffer=0):
- self.site = site
- self.inner_path = inner_path
- file_path = site.storage.getPath(inner_path)
- file_info = self.site.content_manager.getFileInfo(inner_path)
- self.piece_size = file_info["piece_size"]
- self.sha512 = file_info["sha512"]
- self.size = file_info["size"]
- self.prebuffer = prebuffer
- self.read_bytes = 0
- self.piecefield = self.site.storage.piecefields[self.sha512]
- self.f = open(file_path, "rb+")
- self.read_lock = gevent.lock.Semaphore()
- def read(self, buff=64 * 1024):
- with self.read_lock:
- pos = self.f.tell()
- read_until = min(self.size, pos + buff)
- requests = []
- # Request all required blocks
- while 1:
- piece_i = int(pos / self.piece_size)
- if piece_i * self.piece_size >= read_until:
- break
- pos_from = piece_i * self.piece_size
- pos_to = pos_from + self.piece_size
- if not self.piecefield[piece_i]:
- requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
- pos += self.piece_size
- if not all(requests):
- return None
- # Request prebuffer
- if self.prebuffer:
- prebuffer_until = min(self.size, read_until + self.prebuffer)
- priority = 3
- while 1:
- piece_i = int(pos / self.piece_size)
- if piece_i * self.piece_size >= prebuffer_until:
- break
- pos_from = piece_i * self.piece_size
- pos_to = pos_from + self.piece_size
- if not self.piecefield[piece_i]:
- self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
- priority -= 1
- pos += self.piece_size
- gevent.joinall(requests)
- self.read_bytes += buff
- # Increase buffer for long reads
- if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
- self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
- self.prebuffer = 5 * 1024 * 1024
- return self.f.read(buff)
- def seek(self, pos, whence=0):
- with self.read_lock:
- if whence == 2: # Relative from file end
- pos = self.size + pos # Use the real size instead of size on the disk
- whence = 0
- return self.f.seek(pos, whence)
- def seekable(self):
- return self.f.seekable()
- def tell(self):
- return self.f.tell()
- def close(self):
- self.f.close()
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- @PluginManager.registerTo("WorkerManager")
- class WorkerManagerPlugin(object):
- def addTask(self, inner_path, *args, **kwargs):
- file_info = kwargs.get("file_info")
- if file_info and "piecemap" in file_info: # Bigfile
- self.site.settings["has_bigfile"] = True
- piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
- piecemap_task = None
- if not self.site.storage.isFile(piecemap_inner_path):
- # Start download piecemap
- piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
- autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit)
- if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit:
- gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
- if "|" in inner_path:
- # Start download piece
- task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
- inner_path, file_range = inner_path.split("|")
- pos_from, pos_to = map(int, file_range.split("-"))
- task["piece_i"] = int(pos_from / file_info["piece_size"])
- task["sha512"] = file_info["sha512"]
- else:
- if inner_path in self.site.bad_files:
- del self.site.bad_files[inner_path]
- if piecemap_task:
- task = piecemap_task
- else:
- fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
- fake_evt.set(True)
- task = {"evt": fake_evt}
- if not self.site.storage.isFile(inner_path):
- self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
- piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
- self.site.storage.piecefields[file_info["sha512"]].frombytes(b"\x00" * piece_num)
- else:
- task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
- return task
- def taskAddPeer(self, task, peer):
- if "piece_i" in task:
- if not peer.piecefields[task["sha512"]][task["piece_i"]]:
- if task["sha512"] not in peer.piecefields:
- gevent.spawn(peer.updatePiecefields, force=True)
- elif not task["peers"]:
- gevent.spawn(peer.updatePiecefields)
- return False # Deny to add peers to task if file not in piecefield
- return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
- @PluginManager.registerTo("FileRequest")
- class FileRequestPlugin(object):
- def isReadable(self, site, inner_path, file, pos):
- # Peek into file
- if file.read(10) == b"\0" * 10:
- # Looks empty, but makes sures we don't have that piece
- file_info = site.content_manager.getFileInfo(inner_path)
- if "piece_size" in file_info:
- piece_i = int(pos / file_info["piece_size"])
- if not site.storage.piecefields[file_info["sha512"]][piece_i]:
- return False
- # Seek back to position we want to read
- file.seek(pos)
- return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
- def actionGetPiecefields(self, params):
- site = self.sites.get(params["site"])
- if not site or not site.isServing(): # Site unknown or not serving
- self.response({"error": "Unknown site"})
- return False
- # Add peer to site if not added before
- peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
- if not peer.connection: # Just added
- peer.connect(self.connection) # Assign current connection to peer
- piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.items()}
- self.response({"piecefields_packed": piecefields_packed})
- def actionSetPiecefields(self, params):
- site = self.sites.get(params["site"])
- if not site or not site.isServing(): # Site unknown or not serving
- self.response({"error": "Unknown site"})
- self.connection.badAction(5)
- return False
- # Add or get peer
- peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
- if not peer.connection:
- peer.connect(self.connection)
- peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
- for sha512, piecefield_packed in params["piecefields_packed"].items():
- peer.piecefields[sha512].unpack(piecefield_packed)
- site.settings["has_bigfile"] = True
- self.response({"ok": "Updated"})
- @PluginManager.registerTo("Peer")
- class PeerPlugin(object):
- def __getattr__(self, key):
- if key == "piecefields":
- self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
- return self.piecefields
- elif key == "time_piecefields_updated":
- self.time_piecefields_updated = None
- return self.time_piecefields_updated
- else:
- return super(PeerPlugin, self).__getattr__(key)
- @util.Noparallel(ignore_args=True)
- def updatePiecefields(self, force=False):
- if self.connection and self.connection.handshake.get("rev", 0) < 2190:
- return False # Not supported
- # Don't update piecefield again in 1 min
- if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
- return False
- self.time_piecefields_updated = time.time()
- res = self.request("getPiecefields", {"site": self.site.address})
- if not res or "error" in res:
- return False
- self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
- try:
- for sha512, piecefield_packed in res["piecefields_packed"].items():
- self.piecefields[sha512].unpack(piecefield_packed)
- except Exception as err:
- self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
- return self.piecefields
- def sendMyHashfield(self, *args, **kwargs):
- return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
- def updateHashfield(self, *args, **kwargs):
- if self.site.settings.get("has_bigfile"):
- thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
- back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
- thread.join()
- return back
- else:
- return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
- def getFile(self, site, inner_path, *args, **kwargs):
- if "|" in inner_path:
- inner_path, file_range = inner_path.split("|")
- pos_from, pos_to = map(int, file_range.split("-"))
- kwargs["pos_from"] = pos_from
- kwargs["pos_to"] = pos_to
- return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
- @PluginManager.registerTo("Site")
- class SitePlugin(object):
- def isFileDownloadAllowed(self, inner_path, file_info):
- if "piecemap" in file_info:
- file_size_mb = file_info["size"] / 1024 / 1024
- if config.bigfile_size_limit and file_size_mb > config.bigfile_size_limit:
- self.log.debug(
- "Bigfile size %s too large: %sMB > %sMB, skipping..." %
- (inner_path, file_size_mb, config.bigfile_size_limit)
- )
- return False
- file_info = file_info.copy()
- file_info["size"] = file_info["piece_size"]
- return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
- def getSettingsCache(self):
- back = super(SitePlugin, self).getSettingsCache()
- if self.storage.piecefields:
- back["piecefields"] = {sha512: base64.b64encode(piecefield.pack()).decode("utf8") for sha512, piecefield in self.storage.piecefields.items()}
- return back
- def needFile(self, inner_path, *args, **kwargs):
- if inner_path.endswith("|all"):
- @util.Pooled(20)
- def pooledNeedBigfile(inner_path, *args, **kwargs):
- if inner_path not in self.bad_files:
- self.log.debug("Cancelled piece, skipping %s" % inner_path)
- return False
- return self.needFile(inner_path, *args, **kwargs)
- inner_path = inner_path.replace("|all", "")
- file_info = self.needFileInfo(inner_path)
- # Use default function to download non-optional file
- if "piece_size" not in file_info:
- return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
- file_size = file_info["size"]
- piece_size = file_info["piece_size"]
- piece_num = int(math.ceil(float(file_size) / piece_size))
- file_threads = []
- piecefield = self.storage.piecefields.get(file_info["sha512"])
- for piece_i in range(piece_num):
- piece_from = piece_i * piece_size
- piece_to = min(file_size, piece_from + piece_size)
- if not piecefield or not piecefield[piece_i]:
- inner_path_piece = "%s|%s-%s" % (inner_path, piece_from, piece_to)
- self.bad_files[inner_path_piece] = self.bad_files.get(inner_path_piece, 1)
- res = pooledNeedBigfile(inner_path_piece, blocking=False)
- if res is not True and res is not False:
- file_threads.append(res)
- gevent.joinall(file_threads)
- else:
- return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
- @PluginManager.registerTo("ConfigPlugin")
- class ConfigPlugin(object):
- def createArguments(self):
- group = self.parser.add_argument_group("Bigfile plugin")
- group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=10, metavar="MB", type=int)
- group.add_argument('--bigfile_size_limit', help='Maximum size of downloaded big files', default=False, metavar="MB", type=int)
- return super(ConfigPlugin, self).createArguments()
|