AnnounceZeroPlugin.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. import time
  2. import itertools
  3. from Plugin import PluginManager
  4. from util import helper
  5. from Crypt import CryptRsa
  6. allow_reload = False # No source reload supported in this plugin
  7. time_full_announced = {} # Tracker address: Last announced all site to tracker
  8. connection_pool = {} # Tracker address: Peer object
  9. # We can only import plugin host clases after the plugins are loaded
  10. @PluginManager.afterLoad
  11. def importHostClasses():
  12. global Peer, AnnounceError
  13. from Peer import Peer
  14. from Site.SiteAnnouncer import AnnounceError
  15. # Process result got back from tracker
  16. def processPeerRes(tracker_address, site, peers):
  17. added = 0
  18. # Onion
  19. found_onion = 0
  20. for packed_address in peers["onion"]:
  21. found_onion += 1
  22. peer_onion, peer_port = helper.unpackOnionAddress(packed_address)
  23. if site.addPeer(peer_onion, peer_port, source="tracker"):
  24. added += 1
  25. # Ip4
  26. found_ipv4 = 0
  27. peers_normal = itertools.chain(peers.get("ip4", []), peers.get("ipv4", []), peers.get("ipv6", []))
  28. for packed_address in peers_normal:
  29. found_ipv4 += 1
  30. peer_ip, peer_port = helper.unpackAddress(packed_address)
  31. if site.addPeer(peer_ip, peer_port, source="tracker"):
  32. added += 1
  33. if added:
  34. site.worker_manager.onPeers()
  35. site.updateWebsocket(peers_added=added)
  36. return added
  37. @PluginManager.registerTo("SiteAnnouncer")
  38. class SiteAnnouncerPlugin(object):
  39. def getTrackerHandler(self, protocol):
  40. if protocol == "zero":
  41. return self.announceTrackerZero
  42. else:
  43. return super(SiteAnnouncerPlugin, self).getTrackerHandler(protocol)
  44. def announceTrackerZero(self, tracker_address, mode="start", num_want=10):
  45. global time_full_announced
  46. s = time.time()
  47. need_types = ["ip4"] # ip4 for backward compatibility reasons
  48. need_types += self.site.connection_server.supported_ip_types
  49. if self.site.connection_server.tor_manager.enabled:
  50. need_types.append("onion")
  51. if mode == "start" or mode == "more": # Single: Announce only this site
  52. sites = [self.site]
  53. full_announce = False
  54. else: # Multi: Announce all currently serving site
  55. full_announce = True
  56. if time.time() - time_full_announced.get(tracker_address, 0) < 60 * 15: # No reannounce all sites within short time
  57. return None
  58. time_full_announced[tracker_address] = time.time()
  59. from Site import SiteManager
  60. sites = [site for site in SiteManager.site_manager.sites.values() if site.isServing()]
  61. # Create request
  62. add_types = self.getOpenedServiceTypes()
  63. request = {
  64. "hashes": [], "onions": [], "port": self.fileserver_port, "need_types": need_types, "need_num": 20, "add": add_types
  65. }
  66. for site in sites:
  67. if "onion" in add_types:
  68. onion = self.site.connection_server.tor_manager.getOnion(site.address)
  69. request["onions"].append(onion)
  70. request["hashes"].append(site.address_hash)
  71. # Tracker can remove sites that we don't announce
  72. if full_announce:
  73. request["delete"] = True
  74. # Sent request to tracker
  75. tracker_peer = connection_pool.get(tracker_address) # Re-use tracker connection if possible
  76. if not tracker_peer:
  77. tracker_ip, tracker_port = tracker_address.rsplit(":", 1)
  78. tracker_peer = Peer(str(tracker_ip), int(tracker_port), connection_server=self.site.connection_server)
  79. tracker_peer.is_tracker_connection = True
  80. connection_pool[tracker_address] = tracker_peer
  81. res = tracker_peer.request("announce", request)
  82. if not res or "peers" not in res:
  83. if full_announce:
  84. time_full_announced[tracker_address] = 0
  85. raise AnnounceError("Invalid response: %s" % res)
  86. # Add peers from response to site
  87. site_index = 0
  88. peers_added = 0
  89. for site_res in res["peers"]:
  90. site = sites[site_index]
  91. peers_added += processPeerRes(tracker_address, site, site_res)
  92. site_index += 1
  93. # Check if we need to sign prove the onion addresses
  94. if "onion_sign_this" in res:
  95. self.site.log.debug("Signing %s for %s to add %s onions" % (res["onion_sign_this"], tracker_address, len(sites)))
  96. request["onion_signs"] = {}
  97. request["onion_sign_this"] = res["onion_sign_this"]
  98. request["need_num"] = 0
  99. for site in sites:
  100. onion = self.site.connection_server.tor_manager.getOnion(site.address)
  101. publickey = self.site.connection_server.tor_manager.getPublickey(onion)
  102. if publickey not in request["onion_signs"]:
  103. sign = CryptRsa.sign(res["onion_sign_this"].encode("utf8"), self.site.connection_server.tor_manager.getPrivatekey(onion))
  104. request["onion_signs"][publickey] = sign
  105. res = tracker_peer.request("announce", request)
  106. if not res or "onion_sign_this" in res:
  107. if full_announce:
  108. time_full_announced[tracker_address] = 0
  109. raise AnnounceError("Announce onion address to failed: %s" % res)
  110. if full_announce:
  111. tracker_peer.remove() # Close connection, we don't need it in next 5 minute
  112. self.site.log.debug(
  113. "Tracker announce result: zero://%s (sites: %s, new peers: %s, add: %s, mode: %s) in %.3fs" %
  114. (tracker_address, site_index, peers_added, add_types, mode, time.time() - s)
  115. )
  116. return True