AnnounceSharePlugin.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import time
  2. import os
  3. import logging
  4. import json
  5. import atexit
  6. import gevent
  7. from Config import config
  8. from Plugin import PluginManager
  9. from util import helper
  10. class TrackerStorage(object):
  11. def __init__(self):
  12. self.log = logging.getLogger("TrackerStorage")
  13. self.file_path = "%s/trackers.json" % config.data_dir
  14. self.load()
  15. self.time_discover = 0.0
  16. atexit.register(self.save)
  17. def getDefaultFile(self):
  18. return {"shared": {}}
  19. def onTrackerFound(self, tracker_address, type="shared", my=False):
  20. if not tracker_address.startswith("zero://"):
  21. return False
  22. trackers = self.getTrackers()
  23. added = False
  24. if tracker_address not in trackers:
  25. trackers[tracker_address] = {
  26. "time_added": time.time(),
  27. "time_success": 0,
  28. "latency": 99.0,
  29. "num_error": 0,
  30. "my": False
  31. }
  32. self.log.debug("New tracker found: %s" % tracker_address)
  33. added = True
  34. trackers[tracker_address]["time_found"] = time.time()
  35. trackers[tracker_address]["my"] = my
  36. return added
  37. def onTrackerSuccess(self, tracker_address, latency):
  38. trackers = self.getTrackers()
  39. if tracker_address not in trackers:
  40. return False
  41. trackers[tracker_address]["latency"] = latency
  42. trackers[tracker_address]["time_success"] = time.time()
  43. trackers[tracker_address]["num_error"] = 0
  44. def onTrackerError(self, tracker_address):
  45. trackers = self.getTrackers()
  46. if tracker_address not in trackers:
  47. return False
  48. trackers[tracker_address]["time_error"] = time.time()
  49. trackers[tracker_address]["num_error"] += 1
  50. if len(self.getWorkingTrackers()) >= config.working_shared_trackers_limit:
  51. error_limit = 5
  52. else:
  53. error_limit = 30
  54. error_limit
  55. if trackers[tracker_address]["num_error"] > error_limit and trackers[tracker_address]["time_success"] < time.time() - 60 * 60:
  56. self.log.debug("Tracker %s looks down, removing." % tracker_address)
  57. del trackers[tracker_address]
  58. def getTrackers(self, type="shared"):
  59. return self.file_content.setdefault(type, {})
  60. def getWorkingTrackers(self, type="shared"):
  61. trackers = {
  62. key: tracker for key, tracker in self.getTrackers(type).items()
  63. if tracker["time_success"] > time.time() - 60 * 60
  64. }
  65. return trackers
  66. def getFileContent(self):
  67. if not os.path.isfile(self.file_path):
  68. open(self.file_path, "w").write("{}")
  69. return self.getDefaultFile()
  70. try:
  71. return json.load(open(self.file_path))
  72. except Exception as err:
  73. self.log.error("Error loading trackers list: %s" % err)
  74. return self.getDefaultFile()
  75. def load(self):
  76. self.file_content = self.getFileContent()
  77. trackers = self.getTrackers()
  78. self.log.debug("Loaded %s shared trackers" % len(trackers))
  79. for address, tracker in list(trackers.items()):
  80. tracker["num_error"] = 0
  81. if not address.startswith("zero://"):
  82. del trackers[address]
  83. def save(self):
  84. s = time.time()
  85. helper.atomicWrite(self.file_path, json.dumps(self.file_content, indent=2, sort_keys=True).encode("utf8"))
  86. self.log.debug("Saved in %.3fs" % (time.time() - s))
  87. def discoverTrackers(self, peers):
  88. if len(self.getWorkingTrackers()) > config.working_shared_trackers_limit:
  89. return False
  90. s = time.time()
  91. num_success = 0
  92. for peer in peers:
  93. if peer.connection and peer.connection.handshake.get("rev", 0) < 3560:
  94. continue # Not supported
  95. res = peer.request("getTrackers")
  96. if not res or "error" in res:
  97. continue
  98. num_success += 1
  99. for tracker_address in res["trackers"]:
  100. if type(tracker_address) is bytes: # Backward compatibilitys
  101. tracker_address = tracker_address.decode("utf8")
  102. added = self.onTrackerFound(tracker_address)
  103. if added: # Only add one tracker from one source
  104. break
  105. if not num_success and len(peers) < 20:
  106. self.time_discover = 0.0
  107. if num_success:
  108. self.save()
  109. self.log.debug("Trackers discovered from %s/%s peers in %.3fs" % (num_success, len(peers), time.time() - s))
  110. if "tracker_storage" not in locals():
  111. tracker_storage = TrackerStorage()
  112. @PluginManager.registerTo("SiteAnnouncer")
  113. class SiteAnnouncerPlugin(object):
  114. def getTrackers(self):
  115. if tracker_storage.time_discover < time.time() - 5 * 60:
  116. tracker_storage.time_discover = time.time()
  117. gevent.spawn(tracker_storage.discoverTrackers, self.site.getConnectedPeers())
  118. trackers = super(SiteAnnouncerPlugin, self).getTrackers()
  119. shared_trackers = list(tracker_storage.getTrackers("shared").keys())
  120. if shared_trackers:
  121. return trackers + shared_trackers
  122. else:
  123. return trackers
  124. def announceTracker(self, tracker, *args, **kwargs):
  125. res = super(SiteAnnouncerPlugin, self).announceTracker(tracker, *args, **kwargs)
  126. if res:
  127. latency = res
  128. tracker_storage.onTrackerSuccess(tracker, latency)
  129. elif res is False:
  130. tracker_storage.onTrackerError(tracker)
  131. return res
  132. @PluginManager.registerTo("FileRequest")
  133. class FileRequestPlugin(object):
  134. def actionGetTrackers(self, params):
  135. shared_trackers = list(tracker_storage.getWorkingTrackers("shared").keys())
  136. self.response({"trackers": shared_trackers})
  137. @PluginManager.registerTo("FileServer")
  138. class FileServerPlugin(object):
  139. def portCheck(self, *args, **kwargs):
  140. res = super(FileServerPlugin, self).portCheck(*args, **kwargs)
  141. if res and not config.tor == "always" and "Bootstrapper" in PluginManager.plugin_manager.plugin_names:
  142. for ip in self.ip_external_list:
  143. my_tracker_address = "zero://%s:%s" % (ip, config.fileserver_port)
  144. tracker_storage.onTrackerFound(my_tracker_address, my=True)
  145. return res
  146. @PluginManager.registerTo("ConfigPlugin")
  147. class ConfigPlugin(object):
  148. def createArguments(self):
  149. group = self.parser.add_argument_group("AnnounceShare plugin")
  150. group.add_argument('--working-shared-trackers-limit', help='Stop discovering new shared trackers after this number of shared trackers reached', default=5, type=int, metavar='limit')
  151. return super(ConfigPlugin, self).createArguments()