models.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. # coding: utf-8
  2. from __future__ import print_function
  3. from __future__ import absolute_import
  4. from __future__ import division
  5. from __future__ import unicode_literals
  6. from django.utils.timezone import utc, now
  7. from django.db import models
  8. from django.conf import settings
  9. import logging
  10. import projectb.models as pmodels
  11. from backend import utils
  12. import datetime
  13. import time
  14. import re
  15. import os
  16. import os.path
  17. import email.utils
  18. import json
  19. import xapian
  20. log = logging.getLogger(__name__)
  21. MINECHANGELOGS_CACHEDIR = getattr(settings, "MINECHANGELOGS_CACHEDIR", "./data/mc_cache")
  22. MINECHANGELOGS_INDEXDIR = getattr(settings, "MINECHANGELOGS_INDEXDIR", "./data/mc_index")
  23. def parse_changelog(fname):
  24. """
  25. Generate changelog entries reading from a file
  26. """
  27. log.info("%s: reading changelog entries", fname)
  28. re_pkg = re.compile(r"^(\S+) \(([^)]+)\)")
  29. re_ts_line = re.compile(r"^ --(.+>)\s+(\w+\s*,\s*\d+\s+\w+\s*\d+\s+\d+:\d+:\d+)")
  30. count = 0
  31. with open(fname) as infd:
  32. in_changelog = False
  33. tag = ""
  34. lines = []
  35. for line in infd:
  36. mo = re_pkg.match(line)
  37. if mo:
  38. in_changelog = True
  39. lines = []
  40. tag = "%s_%s" % (mo.group(1), mo.group(2))
  41. else:
  42. mo = re_ts_line.match(line)
  43. if mo:
  44. in_changelog = False
  45. count += 1
  46. yield tag, mo.group(2), mo.group(1), "".join(lines)
  47. if in_changelog:
  48. lines.append(line)
  49. log.info("%s: %d changelog entries found", fname, count)
  50. class parse_projectb(object):
  51. """
  52. Extract changelog entries from projectb, checkpointing all the last
  53. extracted entries to a pickled cache file.
  54. This works as a context manager:
  55. with parse_projectb() as changes:
  56. for changelog_entry in changes:
  57. process(changelog_entry)
  58. """
  59. def __init__(self, statefile=None):
  60. if statefile is None:
  61. statefile = os.path.join(MINECHANGELOGS_CACHEDIR, "index-checkpoint.json")
  62. self.statefile = statefile
  63. def load_state(self):
  64. """
  65. Read the last saved state
  66. """
  67. try:
  68. with open(self.statefile) as infd:
  69. state = json.load(infd)
  70. except IOError:
  71. state = {}
  72. if "old_seen" in state:
  73. dt = datetime.datetime.strptime(state["old_seen"], "%Y-%m-%d %H:%M:%S")
  74. state["old_seen"] = dt.replace(tzinfo=utc)
  75. self.state = state
  76. def save_state(self):
  77. """
  78. Atomically commit the state to disk
  79. """
  80. # Ensure the cache dir exists
  81. dirname = os.path.dirname(self.statefile)
  82. if not os.path.isdir(dirname):
  83. os.makedirs(dirname)
  84. state = {}
  85. if "old_seen" in self.state:
  86. state["old_seen"] = self.state["old_seen"].astimezone(utc).strftime("%Y-%m-%d %H:%M:%S")
  87. if "old_seen_ids" in self.state:
  88. state["old_seen_ids"] = self.state["old_seen_ids"]
  89. with utils.atomic_writer(self.statefile) as outfd:
  90. json.dump(state, outfd)
  91. def get_changes(self):
  92. """
  93. Produce information about new uploads since the last run
  94. """
  95. cur = pmodels.cursor()
  96. # Read last checkpoint state
  97. old_seen = self.state.get("old_seen", None)
  98. old_seen_ids = self.state.get("old_seen_ids", [])
  99. # Get the new changes, limited to the newest version
  100. q = """
  101. SELECT c.id, c.seen, c.source, c.version,
  102. c.date, c.changedby, ch.changelog
  103. FROM changes c
  104. JOIN changelogs ch ON ch.id=c.changelog_id
  105. """
  106. if old_seen is None:
  107. cur.execute(q + " ORDER BY seen")
  108. #cur.execute(q + " WHERE seen >= '2011-07-01 00:00:00' ORDER BY seen")
  109. else:
  110. cur.execute(q + " WHERE seen >= %s ORDER BY seen", (old_seen,))
  111. log.info("projectb: querying changelogs...")
  112. last_year = None
  113. last_year_count = 0
  114. for id, seen, source, version, date, changedby, changelog in cur:
  115. if last_year is None or last_year != seen.year:
  116. if last_year is None:
  117. log.info("projectb: start of changelog stream.")
  118. else:
  119. log.info("projectb:%d: %d entries read.", last_year, last_year_count)
  120. last_year = seen.year
  121. last_year_count = 0
  122. # Skip the rare cases of partially processed multiple sources on the same instant
  123. if id in old_seen_ids: continue
  124. if old_seen is None or seen > old_seen:
  125. old_seen = seen
  126. old_seen_ids = []
  127. old_seen_ids.append(id)
  128. # Pass on the info to be indexed
  129. yield "%s_%s" % (source, version), date, changedby, changelog
  130. last_year_count += 1
  131. log.info("projectb:%s: %d entries read. End of changelogs stream.", last_year, last_year_count)
  132. self.state["old_seen"] = old_seen
  133. self.state["old_seen_ids"] = old_seen_ids
  134. def __enter__(self):
  135. self.load_state()
  136. return self.get_changes()
  137. def __exit__(self, exc_type, exc_val, exc_tb):
  138. if exc_type is None:
  139. self.save_state()
  140. return False
  141. class Indexer(object):
  142. def __init__(self):
  143. if not os.path.isdir(MINECHANGELOGS_INDEXDIR):
  144. os.makedirs(MINECHANGELOGS_INDEXDIR)
  145. self.xdb = xapian.WritableDatabase(MINECHANGELOGS_INDEXDIR, xapian.DB_CREATE_OR_OPEN)
  146. self.xdb.begin_transaction()
  147. self.re_split = re.compile(r"[^\w_@.-]+")
  148. self.re_ts = re.compile(r"(\w+\s*,\s*\d+\s+\w+\s*\d+\s+\d+:\d+:\d+)")
  149. self.max_ts = None
  150. def tokenise(self, s):
  151. return self.re_split.split(s)
  152. def index(self, entries):
  153. count = 0
  154. for tag, date, changedby, changelog in entries:
  155. count += 1
  156. #if count % 1000 == 0:
  157. # print date
  158. xid = "XP" + tag
  159. document = xapian.Document()
  160. document.set_data(changelog + "\n" + " -- " + changedby + " " + date)
  161. #print date
  162. # Ignore timezones for our purposes: dealing with timezones in
  163. # python means dealing with one of the most demented pieces of code
  164. # people have ever conceived, or otherwise it means introducing
  165. # piles of external dependencies that maybe do the job. We can get
  166. # away without timezones, it is a lucky thing and we take advantage
  167. # of such strokes of luck.
  168. ts = 0
  169. mo = self.re_ts.match(date)
  170. if mo:
  171. #ts = time.mktime(time.strptime(mo.group(1), "%a, %d %b %Y %H:%M:%S"))
  172. parsed = email.utils.parsedate_tz(mo.group(1))
  173. if parsed is not None:
  174. ts = time.mktime(parsed[:9])
  175. #parsed = dateutil.parser.parse(date)
  176. #parsed = email.utils.parsedate_tz(date)
  177. #ts = time.mktime(parsed[:9]) - parsed[9]
  178. document.add_value(0, xapian.sortable_serialise(ts))
  179. document.add_term(xid)
  180. pos = 0
  181. lines = changelog.split("\n")[1:]
  182. lines.append(changedby)
  183. for l in lines:
  184. for tok in self.tokenise(l):
  185. tok = tok.strip(".-")
  186. if not tok: continue
  187. # see ircd (2.10.04+-1)
  188. if len(tok) > 100: continue
  189. if tok.isdigit(): continue
  190. document.add_posting(tok, pos)
  191. pos += 1
  192. self.xdb.replace_document(xid, document)
  193. if self.max_ts is None or ts > self.max_ts:
  194. self.max_ts = ts
  195. def flush(self):
  196. """
  197. Flush and save indexing information
  198. """
  199. if self.max_ts is None:
  200. self.xdb.set_metadata("max_ts", "0")
  201. else:
  202. self.xdb.set_metadata("max_ts", str(self.max_ts))
  203. self.xdb.set_metadata("last_indexed", str(time.time()))
  204. self.xdb.commit_transaction()
  205. def info():
  206. """
  207. Get information about the state of the minechangelogs database
  208. """
  209. xdb = xapian.Database(MINECHANGELOGS_INDEXDIR)
  210. return dict(
  211. max_ts = float(xdb.get_metadata("max_ts")),
  212. last_indexed = float(xdb.get_metadata("last_indexed")),
  213. )
  214. def query(keywords):
  215. """
  216. Get changelog entries matching the given keywords
  217. """
  218. xdb = xapian.Database(MINECHANGELOGS_INDEXDIR)
  219. q = None
  220. for a in keywords:
  221. a = a.strip()
  222. if not a: continue
  223. if ' ' in a:
  224. a = a.split()
  225. p = xapian.Query(xapian.Query.OP_PHRASE, a)
  226. else:
  227. p = xapian.Query(a)
  228. if q is None:
  229. q = p
  230. else:
  231. q = xapian.Query(xapian.Query.OP_OR, q, p)
  232. if q is None: return
  233. enquire = xapian.Enquire(xdb)
  234. enquire.set_query(q)
  235. enquire.set_sort_by_value(0, True)
  236. first = 0
  237. while True:
  238. matches = enquire.get_mset(first, 100)
  239. count = matches.size()
  240. if count == 0: break
  241. for m in matches:
  242. yield m.document.get_data()
  243. first += 100