PeerDbPlugin.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import time
  2. import sqlite3
  3. import random
  4. import atexit
  5. import gevent
  6. from Plugin import PluginManager
  7. @PluginManager.registerTo("ContentDb")
  8. class ContentDbPlugin(object):
  9. def __init__(self, *args, **kwargs):
  10. atexit.register(self.saveAllPeers)
  11. super(ContentDbPlugin, self).__init__(*args, **kwargs)
  12. def getSchema(self):
  13. schema = super(ContentDbPlugin, self).getSchema()
  14. schema["tables"]["peer"] = {
  15. "cols": [
  16. ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"],
  17. ["address", "TEXT NOT NULL"],
  18. ["port", "INTEGER NOT NULL"],
  19. ["hashfield", "BLOB"],
  20. ["time_added", "INTEGER NOT NULL"]
  21. ],
  22. "indexes": [
  23. "CREATE UNIQUE INDEX peer_key ON peer (site_id, address, port)"
  24. ],
  25. "schema_changed": 1
  26. }
  27. return schema
  28. def loadPeers(self, site):
  29. s = time.time()
  30. site_id = self.site_ids.get(site.address)
  31. res = self.execute("SELECT * FROM peer WHERE site_id = :site_id", {"site_id": site_id})
  32. num = 0
  33. num_hashfield = 0
  34. for row in res:
  35. peer = site.addPeer(row["address"], row["port"])
  36. if not peer: # Already exist
  37. continue
  38. if row["hashfield"]:
  39. peer.hashfield.replaceFromString(row["hashfield"])
  40. num_hashfield += 1
  41. peer.time_added = row["time_added"]
  42. num += 1
  43. site.log.debug("%s peers (%s with hashfield) loaded in %.3fs" % (num, num_hashfield, time.time() - s))
  44. def iteratePeers(self, site):
  45. site_id = self.site_ids.get(site.address)
  46. for key, peer in site.peers.iteritems():
  47. address, port = key.split(":")
  48. if peer.has_hashfield:
  49. hashfield = sqlite3.Binary(peer.hashfield.tostring())
  50. else:
  51. hashfield = ""
  52. yield (site_id, address, port, hashfield, int(peer.time_added))
  53. def savePeers(self, site, spawn=False):
  54. if spawn:
  55. # Save peers every hour (+random some secs to not update very site at same time)
  56. gevent.spawn_later(60 * 60 + random.randint(0, 60), self.savePeers, site, spawn=True)
  57. s = time.time()
  58. site_id = self.site_ids.get(site.address)
  59. cur = self.getCursor()
  60. cur.execute("BEGIN")
  61. self.execute("DELETE FROM peer WHERE site_id = :site_id", {"site_id": site_id})
  62. self.cur.cursor.executemany(
  63. "INSERT INTO peer (site_id, address, port, hashfield, time_added) VALUES (?, ?, ?, ?, ?)",
  64. self.iteratePeers(site)
  65. )
  66. cur.execute("END")
  67. site.log.debug("Peers saved in %.3fs" % (time.time() - s))
  68. def initSite(self, site):
  69. super(ContentDbPlugin, self).initSite(site)
  70. gevent.spawn_later(0.5, self.loadPeers, site)
  71. gevent.spawn_later(60*60, self.savePeers, site, spawn=True)
  72. def saveAllPeers(self):
  73. for site in self.sites.values():
  74. try:
  75. self.savePeers(site)
  76. except Exception, err:
  77. site.log.error("Save peer error: %s" % err)