NewsfeedPlugin.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import time
  2. import re
  3. from Plugin import PluginManager
  4. from Db.DbQuery import DbQuery
  5. from Debug import Debug
  6. from util import helper
  7. from util.Flag import flag
  8. @PluginManager.registerTo("UiWebsocket")
  9. class UiWebsocketPlugin(object):
  10. def formatSiteInfo(self, site, create_user=True):
  11. site_info = super(UiWebsocketPlugin, self).formatSiteInfo(site, create_user=create_user)
  12. feed_following = self.user.sites.get(site.address, {}).get("follow", None)
  13. if feed_following == None:
  14. site_info["feed_follow_num"] = None
  15. else:
  16. site_info["feed_follow_num"] = len(feed_following)
  17. return site_info
  18. def actionFeedFollow(self, to, feeds):
  19. self.user.setFeedFollow(self.site.address, feeds)
  20. self.user.save()
  21. self.response(to, "ok")
  22. def actionFeedListFollow(self, to):
  23. feeds = self.user.sites.get(self.site.address, {}).get("follow", {})
  24. self.response(to, feeds)
  25. @flag.admin
  26. def actionFeedQuery(self, to, limit=10, day_limit=3):
  27. from Site import SiteManager
  28. rows = []
  29. stats = []
  30. total_s = time.time()
  31. num_sites = 0
  32. for address, site_data in list(self.user.sites.items()):
  33. feeds = site_data.get("follow")
  34. if not feeds:
  35. continue
  36. if type(feeds) is not dict:
  37. self.log.debug("Invalid feed for site %s" % address)
  38. continue
  39. num_sites += 1
  40. for name, query_set in feeds.items():
  41. site = SiteManager.site_manager.get(address)
  42. if not site or not site.storage.has_db:
  43. continue
  44. s = time.time()
  45. try:
  46. query_raw, params = query_set
  47. query_parts = re.split(r"UNION(?:\s+ALL|)", query_raw)
  48. for i, query_part in enumerate(query_parts):
  49. db_query = DbQuery(query_part)
  50. if day_limit:
  51. where = " WHERE %s > strftime('%%s', 'now', '-%s day')" % (db_query.fields.get("date_added", "date_added"), day_limit)
  52. if "WHERE" in query_part:
  53. query_part = re.sub("WHERE (.*?)(?=$| GROUP BY)", where+" AND (\\1)", query_part)
  54. else:
  55. query_part += where
  56. query_parts[i] = query_part
  57. query = " UNION ".join(query_parts)
  58. if ":params" in query:
  59. query_params = map(helper.sqlquote, params)
  60. query = query.replace(":params", ",".join(query_params))
  61. res = site.storage.query(query + " ORDER BY date_added DESC LIMIT %s" % limit)
  62. except Exception as err: # Log error
  63. self.log.error("%s feed query %s error: %s" % (address, name, Debug.formatException(err)))
  64. stats.append({"site": site.address, "feed_name": name, "error": str(err)})
  65. continue
  66. for row in res:
  67. row = dict(row)
  68. if not isinstance(row["date_added"], (int, float, complex)):
  69. self.log.debug("Invalid date_added from site %s: %r" % (address, row["date_added"]))
  70. continue
  71. if row["date_added"] > 1000000000000: # Formatted as millseconds
  72. row["date_added"] = row["date_added"] / 1000
  73. if "date_added" not in row or row["date_added"] > time.time() + 120:
  74. self.log.debug("Newsfeed item from the future from from site %s" % address)
  75. continue # Feed item is in the future, skip it
  76. row["site"] = address
  77. row["feed_name"] = name
  78. rows.append(row)
  79. stats.append({"site": site.address, "feed_name": name, "taken": round(time.time() - s, 3)})
  80. time.sleep(0.001)
  81. return self.response(to, {"rows": rows, "stats": stats, "num": len(rows), "sites": num_sites, "taken": round(time.time() - total_s, 3)})
  82. def parseSearch(self, search):
  83. parts = re.split("(site|type):", search)
  84. if len(parts) > 1: # Found filter
  85. search_text = parts[0]
  86. parts = [part.strip() for part in parts]
  87. filters = dict(zip(parts[1::2], parts[2::2]))
  88. else:
  89. search_text = search
  90. filters = {}
  91. return [search_text, filters]
  92. def actionFeedSearch(self, to, search, limit=30, day_limit=30):
  93. if "ADMIN" not in self.site.settings["permissions"]:
  94. return self.response(to, "FeedSearch not allowed")
  95. from Site import SiteManager
  96. rows = []
  97. stats = []
  98. num_sites = 0
  99. total_s = time.time()
  100. search_text, filters = self.parseSearch(search)
  101. for address, site in SiteManager.site_manager.list().items():
  102. if not site.storage.has_db:
  103. continue
  104. if "site" in filters:
  105. if filters["site"].lower() not in [site.address, site.content_manager.contents["content.json"].get("title").lower()]:
  106. continue
  107. if site.storage.db: # Database loaded
  108. feeds = site.storage.db.schema.get("feeds")
  109. else:
  110. try:
  111. feeds = site.storage.loadJson("dbschema.json").get("feeds")
  112. except:
  113. continue
  114. if not feeds:
  115. continue
  116. num_sites += 1
  117. for name, query in feeds.items():
  118. s = time.time()
  119. try:
  120. db_query = DbQuery(query)
  121. params = []
  122. # Filters
  123. if search_text:
  124. db_query.wheres.append("(%s LIKE ? OR %s LIKE ?)" % (db_query.fields["body"], db_query.fields["title"]))
  125. search_like = "%" + search_text.replace(" ", "%") + "%"
  126. params.append(search_like)
  127. params.append(search_like)
  128. if filters.get("type") and filters["type"] not in query:
  129. continue
  130. if day_limit:
  131. db_query.wheres.append(
  132. "%s > strftime('%%s', 'now', '-%s day')" % (db_query.fields.get("date_added", "date_added"), day_limit)
  133. )
  134. # Order
  135. db_query.parts["ORDER BY"] = "date_added DESC"
  136. db_query.parts["LIMIT"] = str(limit)
  137. res = site.storage.query(str(db_query), params)
  138. except Exception as err:
  139. self.log.error("%s feed query %s error: %s" % (address, name, Debug.formatException(err)))
  140. stats.append({"site": site.address, "feed_name": name, "error": str(err), "query": query})
  141. continue
  142. for row in res:
  143. row = dict(row)
  144. if not row["date_added"] or row["date_added"] > time.time() + 120:
  145. continue # Feed item is in the future, skip it
  146. row["site"] = address
  147. row["feed_name"] = name
  148. rows.append(row)
  149. stats.append({"site": site.address, "feed_name": name, "taken": round(time.time() - s, 3)})
  150. return self.response(to, {"rows": rows, "num": len(rows), "sites": num_sites, "taken": round(time.time() - total_s, 3), "stats": stats})
  151. @PluginManager.registerTo("User")
  152. class UserPlugin(object):
  153. # Set queries that user follows
  154. def setFeedFollow(self, address, feeds):
  155. site_data = self.getSiteData(address)
  156. site_data["follow"] = feeds
  157. self.save()
  158. return site_data