AnnounceLocalPlugin.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import time
  2. import gevent
  3. from Plugin import PluginManager
  4. from Config import config
  5. from . import BroadcastServer
  6. @PluginManager.registerTo("SiteAnnouncer")
  7. class SiteAnnouncerPlugin(object):
  8. def announce(self, force=False, *args, **kwargs):
  9. local_announcer = self.site.connection_server.local_announcer
  10. thread = None
  11. if local_announcer and (force or time.time() - local_announcer.last_discover > 5 * 60):
  12. thread = gevent.spawn(local_announcer.discover, force=force)
  13. back = super(SiteAnnouncerPlugin, self).announce(force=force, *args, **kwargs)
  14. if thread:
  15. thread.join()
  16. return back
  17. class LocalAnnouncer(BroadcastServer.BroadcastServer):
  18. def __init__(self, server, listen_port):
  19. super(LocalAnnouncer, self).__init__("zeronet", listen_port=listen_port)
  20. self.server = server
  21. self.sender_info["peer_id"] = self.server.peer_id
  22. self.sender_info["port"] = self.server.port
  23. self.sender_info["broadcast_port"] = listen_port
  24. self.sender_info["rev"] = config.rev
  25. self.known_peers = {}
  26. self.last_discover = 0
  27. def discover(self, force=False):
  28. self.log.debug("Sending discover request (force: %s)" % force)
  29. self.last_discover = time.time()
  30. if force: # Probably new site added, clean cache
  31. self.known_peers = {}
  32. for peer_id, known_peer in list(self.known_peers.items()):
  33. if time.time() - known_peer["found"] > 20 * 60:
  34. del(self.known_peers[peer_id])
  35. self.log.debug("Timeout, removing from known_peers: %s" % peer_id)
  36. self.broadcast({"cmd": "discoverRequest", "params": {}}, port=self.listen_port)
  37. def actionDiscoverRequest(self, sender, params):
  38. back = {
  39. "cmd": "discoverResponse",
  40. "params": {
  41. "sites_changed": self.server.site_manager.sites_changed
  42. }
  43. }
  44. if sender["peer_id"] not in self.known_peers:
  45. self.known_peers[sender["peer_id"]] = {"added": time.time(), "sites_changed": 0, "updated": 0, "found": time.time()}
  46. self.log.debug("Got discover request from unknown peer %s (%s), time to refresh known peers" % (sender["ip"], sender["peer_id"]))
  47. gevent.spawn_later(1.0, self.discover) # Let the response arrive first to the requester
  48. return back
  49. def actionDiscoverResponse(self, sender, params):
  50. if sender["peer_id"] in self.known_peers:
  51. self.known_peers[sender["peer_id"]]["found"] = time.time()
  52. if params["sites_changed"] != self.known_peers.get(sender["peer_id"], {}).get("sites_changed"):
  53. # Peer's site list changed, request the list of new sites
  54. return {"cmd": "siteListRequest"}
  55. else:
  56. # Peer's site list is the same
  57. for site in self.server.sites.values():
  58. peer = site.peers.get("%s:%s" % (sender["ip"], sender["port"]))
  59. if peer:
  60. peer.found("local")
  61. def actionSiteListRequest(self, sender, params):
  62. back = []
  63. sites = list(self.server.sites.values())
  64. # Split adresses to group of 100 to avoid UDP size limit
  65. site_groups = [sites[i:i + 100] for i in range(0, len(sites), 100)]
  66. for site_group in site_groups:
  67. res = {}
  68. res["sites_changed"] = self.server.site_manager.sites_changed
  69. res["sites"] = [site.address_hash for site in site_group]
  70. back.append({"cmd": "siteListResponse", "params": res})
  71. return back
  72. def actionSiteListResponse(self, sender, params):
  73. s = time.time()
  74. peer_sites = set(params["sites"])
  75. num_found = 0
  76. added_sites = []
  77. for site in self.server.sites.values():
  78. if site.address_hash in peer_sites:
  79. added = site.addPeer(sender["ip"], sender["port"], source="local")
  80. num_found += 1
  81. if added:
  82. site.worker_manager.onPeers()
  83. site.updateWebsocket(peers_added=1)
  84. added_sites.append(site)
  85. # Save sites changed value to avoid unnecessary site list download
  86. if sender["peer_id"] not in self.known_peers:
  87. self.known_peers[sender["peer_id"]] = {"added": time.time()}
  88. self.known_peers[sender["peer_id"]]["sites_changed"] = params["sites_changed"]
  89. self.known_peers[sender["peer_id"]]["updated"] = time.time()
  90. self.known_peers[sender["peer_id"]]["found"] = time.time()
  91. self.log.debug(
  92. "Tracker result: Discover from %s response parsed in %.3fs, found: %s added: %s of %s" %
  93. (sender["ip"], time.time() - s, num_found, added_sites, len(peer_sites))
  94. )
  95. @PluginManager.registerTo("FileServer")
  96. class FileServerPlugin(object):
  97. def __init__(self, *args, **kwargs):
  98. super(FileServerPlugin, self).__init__(*args, **kwargs)
  99. if config.broadcast_port and config.tor != "always" and not config.disable_udp:
  100. self.local_announcer = LocalAnnouncer(self, config.broadcast_port)
  101. else:
  102. self.local_announcer = None
  103. def start(self, *args, **kwargs):
  104. if self.local_announcer:
  105. gevent.spawn(self.local_announcer.start)
  106. return super(FileServerPlugin, self).start(*args, **kwargs)
  107. def stop(self):
  108. if self.local_announcer:
  109. self.local_announcer.stop()
  110. res = super(FileServerPlugin, self).stop()
  111. return res
  112. @PluginManager.registerTo("ConfigPlugin")
  113. class ConfigPlugin(object):
  114. def createArguments(self):
  115. group = self.parser.add_argument_group("AnnounceLocal plugin")
  116. group.add_argument('--broadcast_port', help='UDP broadcasting port for local peer discovery', default=1544, type=int, metavar='port')
  117. return super(ConfigPlugin, self).createArguments()