ChartCollector.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import time
  2. import sys
  3. import collections
  4. import itertools
  5. import logging
  6. import gevent
  7. from util import helper
  8. from Config import config
  9. class ChartCollector(object):
  10. def __init__(self, db):
  11. self.db = db
  12. if config.action == "main":
  13. gevent.spawn_later(60 * 3, self.collector)
  14. self.log = logging.getLogger("ChartCollector")
  15. self.last_values = collections.defaultdict(dict)
  16. def setInitialLastValues(self, sites):
  17. # Recover last value of site bytes/sent
  18. for site in sites:
  19. self.last_values["site:" + site.address]["site_bytes_recv"] = site.settings.get("bytes_recv", 0)
  20. self.last_values["site:" + site.address]["site_bytes_sent"] = site.settings.get("bytes_sent", 0)
  21. def getCollectors(self):
  22. collectors = {}
  23. import main
  24. file_server = main.file_server
  25. sites = file_server.sites
  26. if not sites:
  27. return collectors
  28. content_db = list(sites.values())[0].content_manager.contents.db
  29. # Connection stats
  30. collectors["connection"] = lambda: len(file_server.connections)
  31. collectors["connection_in"] = (
  32. lambda: len([1 for connection in file_server.connections if connection.type == "in"])
  33. )
  34. collectors["connection_onion"] = (
  35. lambda: len([1 for connection in file_server.connections if connection.ip.endswith(".onion")])
  36. )
  37. collectors["connection_ping_avg"] = (
  38. lambda: round(1000 * helper.avg(
  39. [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
  40. ))
  41. )
  42. collectors["connection_ping_min"] = (
  43. lambda: round(1000 * min(
  44. [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
  45. ))
  46. )
  47. collectors["connection_rev_avg"] = (
  48. lambda: helper.avg(
  49. [connection.handshake["rev"] for connection in file_server.connections if connection.handshake]
  50. )
  51. )
  52. # Request stats
  53. collectors["file_bytes_recv|change"] = lambda: file_server.bytes_recv
  54. collectors["file_bytes_sent|change"] = lambda: file_server.bytes_sent
  55. collectors["request_num_recv|change"] = lambda: file_server.num_recv
  56. collectors["request_num_sent|change"] = lambda: file_server.num_sent
  57. # Limit
  58. collectors["optional_limit"] = lambda: content_db.getOptionalLimitBytes()
  59. collectors["optional_used"] = lambda: content_db.getOptionalUsedBytes()
  60. collectors["optional_downloaded"] = lambda: sum([site.settings.get("optional_downloaded", 0) for site in sites.values()])
  61. # Peers
  62. collectors["peer"] = lambda peers: len(peers)
  63. collectors["peer_onion"] = lambda peers: len([True for peer in peers if ".onion" in peer])
  64. # Size
  65. collectors["size"] = lambda: sum([site.settings.get("size", 0) for site in sites.values()])
  66. collectors["size_optional"] = lambda: sum([site.settings.get("size_optional", 0) for site in sites.values()])
  67. collectors["content"] = lambda: sum([len(site.content_manager.contents) for site in sites.values()])
  68. return collectors
  69. def getSiteCollectors(self):
  70. site_collectors = {}
  71. # Size
  72. site_collectors["site_size"] = lambda site: site.settings.get("size", 0)
  73. site_collectors["site_size_optional"] = lambda site: site.settings.get("size_optional", 0)
  74. site_collectors["site_optional_downloaded"] = lambda site: site.settings.get("optional_downloaded", 0)
  75. site_collectors["site_content"] = lambda site: len(site.content_manager.contents)
  76. # Data transfer
  77. site_collectors["site_bytes_recv|change"] = lambda site: site.settings.get("bytes_recv", 0)
  78. site_collectors["site_bytes_sent|change"] = lambda site: site.settings.get("bytes_sent", 0)
  79. # Peers
  80. site_collectors["site_peer"] = lambda site: len(site.peers)
  81. site_collectors["site_peer_onion"] = lambda site: len(
  82. [True for peer in site.peers.values() if peer.ip.endswith(".onion")]
  83. )
  84. site_collectors["site_peer_connected"] = lambda site: len([True for peer in site.peers.values() if peer.connection])
  85. return site_collectors
  86. def getUniquePeers(self):
  87. import main
  88. sites = main.file_server.sites
  89. return set(itertools.chain.from_iterable(
  90. [site.peers.keys() for site in sites.values()]
  91. ))
  92. def collectDatas(self, collectors, last_values, site=None):
  93. if site is None:
  94. peers = self.getUniquePeers()
  95. datas = {}
  96. for key, collector in collectors.items():
  97. try:
  98. if site:
  99. value = collector(site)
  100. elif key.startswith("peer"):
  101. value = collector(peers)
  102. else:
  103. value = collector()
  104. except ValueError:
  105. value = None
  106. except Exception as err:
  107. self.log.info("Collector %s error: %s" % (key, err))
  108. value = None
  109. if "|change" in key: # Store changes relative to last value
  110. key = key.replace("|change", "")
  111. last_value = last_values.get(key, 0)
  112. last_values[key] = value
  113. value = value - last_value
  114. if value is None:
  115. datas[key] = None
  116. else:
  117. datas[key] = round(value, 3)
  118. return datas
  119. def collectGlobal(self, collectors, last_values):
  120. now = int(time.time())
  121. s = time.time()
  122. datas = self.collectDatas(collectors, last_values["global"])
  123. values = []
  124. for key, value in datas.items():
  125. values.append((self.db.getTypeId(key), value, now))
  126. self.log.debug("Global collectors done in %.3fs" % (time.time() - s))
  127. s = time.time()
  128. cur = self.db.getCursor()
  129. cur.executemany("INSERT INTO data (type_id, value, date_added) VALUES (?, ?, ?)", values)
  130. self.log.debug("Global collectors inserted in %.3fs" % (time.time() - s))
  131. def collectSites(self, sites, collectors, last_values):
  132. now = int(time.time())
  133. s = time.time()
  134. values = []
  135. for address, site in list(sites.items()):
  136. site_datas = self.collectDatas(collectors, last_values["site:%s" % address], site)
  137. for key, value in site_datas.items():
  138. values.append((self.db.getTypeId(key), self.db.getSiteId(address), value, now))
  139. time.sleep(0.001)
  140. self.log.debug("Site collections done in %.3fs" % (time.time() - s))
  141. s = time.time()
  142. cur = self.db.getCursor()
  143. cur.executemany("INSERT INTO data (type_id, site_id, value, date_added) VALUES (?, ?, ?, ?)", values)
  144. self.log.debug("Site collectors inserted in %.3fs" % (time.time() - s))
  145. def collector(self):
  146. collectors = self.getCollectors()
  147. site_collectors = self.getSiteCollectors()
  148. import main
  149. sites = main.file_server.sites
  150. i = 0
  151. while 1:
  152. self.collectGlobal(collectors, self.last_values)
  153. if i % 12 == 0: # Only collect sites data every hour
  154. self.collectSites(sites, site_collectors, self.last_values)
  155. time.sleep(60 * 5)
  156. i += 1