unittest.py 17 KB


  1. from __future__ import print_function
  2. from __future__ import absolute_import
  3. from __future__ import division
  4. from __future__ import unicode_literals
  5. import backend.models as bmodels
  6. from backend.models import Person, Process, AM, Fingerprint
  7. from backend import const
  8. from django.utils.timezone import now
  9. from django.test import Client
  10. from collections import defaultdict
  11. import datetime
  12. import os
  13. import io
  14. import re
  15. import six
  16. class NamedObjects(dict):
  17. """
  18. Container for fixture model objects.
  19. """
  20. def __init__(self, model, **defaults):
  21. super(NamedObjects, self).__init__()
  22. self._model = model
  23. self._defaults = defaults
  24. def __getitem__(self, key):
  25. """
  26. Dict that only looks things up if they are strings, otherwise just return key.
  27. This allows to use __getitem__ with already resolved objects, just to have
  28. functions that can take either objects or their fixture names.
  29. """
  30. if not isinstance(key, basestring): return key
  31. return super(NamedObjects, self).__getitem__(key)
  32. def __getattr__(self, key):
  33. """
  34. Make dict elements also appear as class members
  35. """
  36. res = self.get(key, None)
  37. if res is not None: return res
  38. raise AttributeError("member {} not found".format(key))
  39. def _update_kwargs_with_defaults(self, _name, kw):
  40. """
  41. Update the kw dict with defaults from self._defaults.
  42. If self._defaults for an argument is a string, then calls .format() on
  43. it passing _name and self._defaults as format arguments.
  44. """
  45. for k, v in self._defaults.items():
  46. if isinstance(v, six.string_types):
  47. kw.setdefault(k, v.format(_name=_name, **self._defaults))
  48. elif hasattr(v, "__call__"):
  49. kw.setdefault(k, v(_name, **self._defaults))
  50. else:
  51. kw.setdefault(k, v)
  52. def create(self, _name, **kw):
  53. self._update_kwargs_with_defaults(_name, kw)
  54. self[_name] = o = self._model.objects.create(**kw)
  55. return o
  56. def refresh(self):
  57. """
  58. Reload all the objects from the database.
  59. This is needed because although Django's TestCase rolls back the
  60. database after a test, the data stored in memory in the objects stored
  61. in NamedObjects repositories is not automatically refreshed.
  62. """
  63. # FIXME: when we get Django 1.8, we can just do
  64. # for o in self.values(): o.refresh_from_db()
  65. for name, o in list(self.items()):
  66. try:
  67. self[name] = self._model.objects.get(pk=o.pk)
  68. except self._model.DoesNotExist:
  69. del self[name]
  70. def delete_all(self):
  71. """
  72. Call delete() on all model objects registered in this dict.
  73. This can be used in methods like tearDownClass to remove objects common
  74. to all tests.
  75. """
  76. for o in self.values():
  77. o.delete()
  78. class TestPersons(NamedObjects):
  79. def __init__(self, **defaults):
  80. defaults.setdefault("cn", lambda name, **kw: name.capitalize())
  81. defaults.setdefault("email", "{_name}@example.org")
  82. defaults.setdefault("email_ldap", "{_name}@example.org")
  83. super(TestPersons, self).__init__(Person, **defaults)
  84. def create(self, _name, alioth=False, **kw):
  85. if alioth:
  86. kw.setdefault("username", _name + "-guest@users.alioth.debian.org")
  87. else:
  88. kw.setdefault("username", _name + "@debian.org")
  89. kw.setdefault("uid", _name)
  90. self._update_kwargs_with_defaults(_name, kw)
  91. self[_name] = o = self._model.objects.create_user(audit_skip=True, **kw)
  92. return o
  93. class TestKeys(NamedObjects):
  94. def __init__(self, **defaults):
  95. from keyring.models import Key
  96. super(TestKeys, self).__init__(Key, **defaults)
  97. def create(self, _name, **kw):
  98. self._update_kwargs_with_defaults(_name, kw)
  99. self._model.objects.test_preload(_name)
  100. self[_name] = o = self._model.objects.get_or_download(_name, **kw)
  101. return o
  102. class TestMeta(type):
  103. def __new__(cls, name, bases, attrs):
  104. res = super(TestMeta, cls).__new__(cls, name, bases, attrs)
  105. if hasattr(res, "__add_extra_tests__"):
  106. res.__add_extra_tests__()
  107. return res
  108. @six.add_metaclass(TestMeta)
  109. class TestBase(object):
  110. @classmethod
  111. def _add_method(cls, meth, *args, **kw):
  112. """
  113. Add a test method, made of the given method called with the given args
  114. and kwargs.
  115. The method name and args are used to built the test method name, the
  116. kwargs are not: make sure you use the args to make the test case
  117. unique, and the kwargs for things you do not want to appear in the test
  118. name, like the expected test results for those args.
  119. """
  120. name = re.sub(r"[^0-9A-Za-z_]", "_", "{}_{}".format(meth.__name__.lstrip("_"), "_".join(str(x) for x in args)))
  121. setattr(cls, name, lambda self: meth(self, *args, **kw))
  122. def make_test_client(self, person, sso_username=None, **kw):
  123. """
  124. Instantiate a test client, logging in the given person.
  125. If person is None, visit anonymously. If person is None but
  126. sso_username is not None, authenticate as the given sso_username even
  127. if a Person record does not exist.
  128. """
  129. person = self.persons[person]
  130. if person is not None:
  131. kw["SSL_CLIENT_S_DN_CN"] = person.username
  132. elif sso_username is not None:
  133. kw["SSL_CLIENT_S_DN_CN"] = sso_username
  134. client = Client(**kw)
  135. client.visitor = person
  136. return client
  137. def assertPermissionDenied(self, response):
  138. if response.status_code == 403:
  139. pass
  140. else:
  141. self.fail("response has status code {} instead of a 403 Forbidden".format(response.status_code))
  142. def assertRedirectMatches(self, response, target):
  143. if response.status_code != 302:
  144. self.fail("response has status code {} instead of a Redirect".format(response.status_code))
  145. if target and not re.search(target, response["Location"]):
  146. self.fail("response redirects to {} which does not match {}".format(response["Location"], target))
  147. def assertFormErrorMatches(self, response, form_name, field_name, regex):
  148. form = response.context[form_name]
  149. errors = form.errors
  150. if not errors: self.fail("Form {} has no errors".format(form_name))
  151. if field_name not in errors: self.fail("Form {} has no errors in field {}".format(form_name, field_name))
  152. match = re.compile(regex)
  153. for errmsg in errors[field_name]:
  154. if match.search(errmsg): return
  155. self.fail("{} dit not match any in {}".format(regex, repr(errors)))
  156. def assertContainsElements(self, response, elements, *names):
  157. """
  158. Check that the response contains only the elements in `names` from PageElements `elements`
  159. """
  160. want = set(names)
  161. extras = want - set(elements.keys())
  162. if extras: raise RuntimeError("Wanted elements not found in the list of possible ones: {}".format(", ".join(extras)))
  163. should_have = []
  164. should_not_have = []
  165. content = response.content.decode("utf-8")
  166. for name, regex in elements.items():
  167. if name in want:
  168. if not regex.search(content):
  169. should_have.append(name)
  170. else:
  171. if regex.search(content):
  172. should_not_have.append(name)
  173. if should_have or should_not_have:
  174. msg = []
  175. if should_have: msg.append("should have element(s) {}".format(", ".join(should_have)))
  176. if should_not_have: msg.append("should not have element(s) {}".format(", ".join(should_not_have)))
  177. self.fail("page " + " and ".join(msg))
  178. class BaseFixtureMixin(TestBase):
  179. @classmethod
  180. def get_persons_defaults(cls):
  181. """
  182. Get default arguments for test persons
  183. """
  184. return {}
  185. @classmethod
  186. def setUpClass(cls):
  187. super(BaseFixtureMixin, cls).setUpClass()
  188. cls.persons = TestPersons(**cls.get_persons_defaults())
  189. cls.ams = NamedObjects(AM)
  190. cls.fingerprints = NamedObjects(Fingerprint)
  191. cls.keys = TestKeys()
  192. # Preload keys
  193. cls.keys.create("66B4DFB68CB24EBBD8650BC4F4B4B0CC797EBFAB")
  194. cls.keys.create("1793D6AB75663E6BF104953A634F4BD1E7AD5568")
  195. cls.keys.create("0EED77DC41D760FDE44035FF5556A34E04A3610B")
  196. @classmethod
  197. def tearDownClass(cls):
  198. cls.keys.delete_all()
  199. cls.ams.delete_all()
  200. cls.fingerprints.delete_all()
  201. cls.persons.delete_all()
  202. super(BaseFixtureMixin, cls).tearDownClass()
  203. def setUp(self):
  204. super(BaseFixtureMixin, self).setUp()
  205. self.persons.refresh();
  206. self.ams.refresh();
  207. self.fingerprints.refresh();
  208. self.keys.refresh();
  209. class PersonFixtureMixin(BaseFixtureMixin):
  210. """
  211. Pre-create some persons
  212. """
  213. @classmethod
  214. def setUpClass(cls):
  215. super(PersonFixtureMixin, cls).setUpClass()
  216. # pending account
  217. cls.persons.create("pending", status=const.STATUS_DC, expires=now() + datetime.timedelta(days=1), pending="12345", alioth=True)
  218. # debian contributor
  219. cls.persons.create("dc", status=const.STATUS_DC, alioth=True)
  220. # debian contributor with guest account
  221. cls.persons.create("dc_ga", status=const.STATUS_DC_GA, alioth=True)
  222. # dm
  223. cls.persons.create("dm", status=const.STATUS_DM, alioth=True)
  224. # dm with guest account
  225. cls.persons.create("dm_ga", status=const.STATUS_DM_GA, alioth=True)
  226. # dd, nonuploading
  227. cls.persons.create("dd_nu", status=const.STATUS_DD_NU)
  228. # dd, uploading
  229. cls.persons.create("dd_u", status=const.STATUS_DD_U)
  230. # dd, emeritus
  231. cls.persons.create("dd_e", status=const.STATUS_EMERITUS_DD)
  232. # dd, removed
  233. cls.persons.create("dd_r", status=const.STATUS_REMOVED_DD)
  234. # unrelated active am
  235. fd = cls.persons.create("activeam", status=const.STATUS_DD_NU)
  236. cls.ams.create("activeam", person=fd)
  237. # fd
  238. fd = cls.persons.create("fd", status=const.STATUS_DD_NU)
  239. cls.ams.create("fd", person=fd, is_fd=True)
  240. # dam
  241. dam = cls.persons.create("dam", status=const.STATUS_DD_U)
  242. cls.ams.create("dam", person=dam, is_fd=True, is_dam=True)
  243. class TestSet(set):
  244. """
  245. Set of strings that can be initialized from space-separated strings, and
  246. changed with simple text patches.
  247. """
  248. def __init__(self, initial=""):
  249. if initial: self.update(initial.split())
  250. def set(self, vals):
  251. self.clear()
  252. self.update(vals.split())
  253. def patch(self, diff):
  254. for change in diff.split():
  255. if change[0] == "+":
  256. self.add(change[1:])
  257. elif change[0] == "-":
  258. self.discard(change[1:])
  259. else:
  260. raise RuntimeError("Changes {} contain {} that is nether an add nor a remove".format(repr(text), repr(change)))
  261. def clone(self):
  262. res = TestSet()
  263. res.update(self)
  264. return res
  265. class PatchExact(object):
  266. def __init__(self, text):
  267. if text:
  268. self.items = set(text.split())
  269. else:
  270. self.items = set()
  271. def apply(self, cur):
  272. if self.items: return set(self.items)
  273. return None
  274. class PatchDiff(object):
  275. def __init__(self, text):
  276. self.added = set()
  277. self.removed = set()
  278. for change in text.split():
  279. if change[0] == "+":
  280. self.added.add(change[1:])
  281. elif change[0] == "-":
  282. self.removed.add(change[1:])
  283. else:
  284. raise RuntimeError("Changes {} contain {} that is nether an add nor a remove".format(text, change))
  285. def apply(self, cur):
  286. if cur is None:
  287. cur = set(self.added)
  288. else:
  289. cur = (cur - self.removed) | self.added
  290. if not cur: return None
  291. return cur
  292. class ExpectedSets(defaultdict):
  293. """
  294. Store the permissions expected out of a *VisitorPermissions object
  295. """
  296. def __init__(self, testcase, action_msg="{visitor}", issue_msg="{problem} {mismatch}"):
  297. super(ExpectedSets, self).__init__(TestSet)
  298. self.testcase = testcase
  299. self.action_msg = action_msg
  300. self.issue_msg = issue_msg
  301. @property
  302. def visitors(self):
  303. return self.keys()
  304. def set(self, visitors, text):
  305. for v in visitors.split():
  306. self[v].set(text)
  307. def patch(self, visitors, text):
  308. for v in visitors.split():
  309. self[v].patch(text)
  310. def select_others(self, persons):
  311. other_visitors = set(persons.keys())
  312. other_visitors.add(None)
  313. other_visitors -= set(self.keys())
  314. return other_visitors
  315. def combine(self, other):
  316. res = ExpectedSets(self.testcase, action_msg=self.action_msg, issue_msg=self.issue_msg)
  317. for k, v in self.items():
  318. res[k] = v.clone()
  319. for k, v in other.items():
  320. res[k].update(v)
  321. return res
  322. def assertEqual(self, visitor, got):
  323. got = set(got)
  324. wanted = self.get(visitor, set())
  325. if got == wanted: return
  326. extra = got - wanted
  327. missing = wanted - got
  328. msgs = []
  329. if missing: msgs.append(self.issue_msg.format(problem="misses", mismatch=", ".join(sorted(missing))))
  330. if extra: msgs.append(self.issue_msg.format(problem="has extra", mismatch=", ".join(sorted(extra))))
  331. self.testcase.fail(self.action_msg.format(visitor=visitor) + " " + " and ".join(msgs))
  332. def assertEmpty(self, visitor, got):
  333. extra = set(got)
  334. if not extra: return
  335. self.testcase.fail(self.action_msg.format(visitor=visitor) + " " + self.issue_msg.format(problem="has", mismatch=", ".join(sorted(extra))))
  336. def assertMatches(self, visited):
  337. for visitor in self.visitors:
  338. visit_perms = visited.permissions_of(self.testcase.persons[visitor])
  339. self.assertEqual(visitor, visit_perms)
  340. for visitor in self.select_others(self.testcase.persons):
  341. visit_perms = visited.permissions_of(self.testcase.persons[visitor] if visitor else None)
  342. self.assertEmpty(visitor, visit_perms)
  343. class ExpectedPerms(object):
  344. """
  345. Store the permissions expected out of a *VisitorPermissions object
  346. """
  347. def __init__(self, perms={}):
  348. self.perms = {}
  349. for visitors, expected_perms in perms.items():
  350. for visitor in visitors.split():
  351. self.perms[visitor] = set(expected_perms.split())
  352. def _apply_diff(self, d, diff):
  353. for visitors, change in diff.items():
  354. for visitor in visitors.split():
  355. cur = change.apply(d.get(visitor, None))
  356. if not cur:
  357. d.pop(visitor, None)
  358. else:
  359. d[visitor] = cur
  360. def update_perms(self, diff):
  361. self._apply_diff(self.perms, diff)
  362. def set_perms(self, visitors, text):
  363. self.update_perms({ visitors: PatchExact(text) })
  364. def patch_perms(self, visitors, text):
  365. self.update_perms({ visitors: PatchDiff(text) })
  366. class PageElements(dict):
  367. """
  368. List of all page elements possibly expected in the results of a view.
  369. dict matching name used to refer to the element with regexp matching the
  370. element.
  371. """
  372. def add_id(self, id):
  373. self[id] = re.compile(r"""id\s*=\s*["']{}["']""".format(re.escape(id)))
  374. def add_class(self, cls):
  375. self[cls] = re.compile(r"""class\s*=\s*["']{}["']""".format(re.escape(cls)))
  376. def add_href(self, name, url):
  377. self[name] = re.compile(r"""href\s*=\s*["']{}["']""".format(re.escape(url)))
  378. def add_string(self, name, term):
  379. self[name] = re.compile(r"""{}""".format(re.escape(term)))
  380. def clone(self):
  381. res = PageElements()
  382. res.update(self.items())
  383. return res
  384. class TestOldProcesses(NamedObjects):
  385. def __init__(self, **defaults):
  386. super(TestOldProcesses, self).__init__(bmodels.Process, **defaults)
  387. defaults.setdefault("progress", const.PROGRESS_APP_NEW)
  388. def create(self, _name, advocates=[], **kw):
  389. self._update_kwargs_with_defaults(_name, kw)
  390. if "process" in kw:
  391. kw.setdefault("is_active", kw["process"] not in (const.PROGRESS_DONE, const.PROGRESS_CANCELLED))
  392. else:
  393. kw.setdefault("is_active", True)
  394. if "manager" in kw:
  395. try:
  396. am = kw["manager"].am
  397. except bmodels.AM.DoesNotExist:
  398. am = bmodels.AM.objects.create(person=kw["manager"])
  399. kw["manager"] = am
  400. self[_name] = o = self._model.objects.create(**kw)
  401. for a in advocates:
  402. o.advocates.add(a)
  403. return o
  404. class OldProcessFixtureMixin(PersonFixtureMixin):
  405. @classmethod
  406. def get_processes_defaults(cls):
  407. """
  408. Get default arguments for test processes
  409. """
  410. return {}
  411. @classmethod
  412. def setUpClass(cls):
  413. super(OldProcessFixtureMixin, cls).setUpClass()
  414. cls.processes = TestOldProcesses(**cls.get_processes_defaults())
  415. @classmethod
  416. def tearDownClass(cls):
  417. cls.processes.delete_all()
  418. super(OldProcessFixtureMixin, cls).tearDownClass()
  419. def setUp(self):
  420. super(OldProcessFixtureMixin, self).setUp()
  421. self.processes.refresh();