123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- from __future__ import print_function
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import unicode_literals
- import backend.models as bmodels
- from backend.models import Person, Process, AM, Fingerprint
- from backend import const
- from django.utils.timezone import now
- from django.test import Client
- from collections import defaultdict
- import datetime
- import os
- import io
- import re
- import six
- class NamedObjects(dict):
- """
- Container for fixture model objects.
- """
- def __init__(self, model, **defaults):
- super(NamedObjects, self).__init__()
- self._model = model
- self._defaults = defaults
- def __getitem__(self, key):
- """
- Dict that only looks things up if they are strings, otherwise just return key.
- This allows to use __getitem__ with already resolved objects, just to have
- functions that can take either objects or their fixture names.
- """
- if not isinstance(key, basestring): return key
- return super(NamedObjects, self).__getitem__(key)
- def __getattr__(self, key):
- """
- Make dict elements also appear as class members
- """
- res = self.get(key, None)
- if res is not None: return res
- raise AttributeError("member {} not found".format(key))
- def _update_kwargs_with_defaults(self, _name, kw):
- """
- Update the kw dict with defaults from self._defaults.
- If self._defaults for an argument is a string, then calls .format() on
- it passing _name and self._defaults as format arguments.
- """
- for k, v in self._defaults.items():
- if isinstance(v, six.string_types):
- kw.setdefault(k, v.format(_name=_name, **self._defaults))
- elif hasattr(v, "__call__"):
- kw.setdefault(k, v(_name, **self._defaults))
- else:
- kw.setdefault(k, v)
- def create(self, _name, **kw):
- self._update_kwargs_with_defaults(_name, kw)
- self[_name] = o = self._model.objects.create(**kw)
- return o
- def refresh(self):
- """
- Reload all the objects from the database.
- This is needed because although Django's TestCase rolls back the
- database after a test, the data stored in memory in the objects stored
- in NamedObjects repositories is not automatically refreshed.
- """
- # FIXME: when we get Django 1.8, we can just do
- # for o in self.values(): o.refresh_from_db()
- for name, o in list(self.items()):
- try:
- self[name] = self._model.objects.get(pk=o.pk)
- except self._model.DoesNotExist:
- del self[name]
- def delete_all(self):
- """
- Call delete() on all model objects registered in this dict.
- This can be used in methods like tearDownClass to remove objects common
- to all tests.
- """
- for o in self.values():
- o.delete()
- class TestPersons(NamedObjects):
- def __init__(self, **defaults):
- defaults.setdefault("cn", lambda name, **kw: name.capitalize())
- defaults.setdefault("email", "{_name}@example.org")
- defaults.setdefault("email_ldap", "{_name}@example.org")
- super(TestPersons, self).__init__(Person, **defaults)
- def create(self, _name, alioth=False, **kw):
- if alioth:
- kw.setdefault("username", _name + "-guest@users.alioth.debian.org")
- else:
- kw.setdefault("username", _name + "@debian.org")
- kw.setdefault("uid", _name)
- self._update_kwargs_with_defaults(_name, kw)
- self[_name] = o = self._model.objects.create_user(audit_skip=True, **kw)
- return o
- class TestKeys(NamedObjects):
- def __init__(self, **defaults):
- from keyring.models import Key
- super(TestKeys, self).__init__(Key, **defaults)
- def create(self, _name, **kw):
- self._update_kwargs_with_defaults(_name, kw)
- self._model.objects.test_preload(_name)
- self[_name] = o = self._model.objects.get_or_download(_name, **kw)
- return o
- class TestMeta(type):
- def __new__(cls, name, bases, attrs):
- res = super(TestMeta, cls).__new__(cls, name, bases, attrs)
- if hasattr(res, "__add_extra_tests__"):
- res.__add_extra_tests__()
- return res
- @six.add_metaclass(TestMeta)
- class TestBase(object):
- @classmethod
- def _add_method(cls, meth, *args, **kw):
- """
- Add a test method, made of the given method called with the given args
- and kwargs.
- The method name and args are used to built the test method name, the
- kwargs are not: make sure you use the args to make the test case
- unique, and the kwargs for things you do not want to appear in the test
- name, like the expected test results for those args.
- """
- name = re.sub(r"[^0-9A-Za-z_]", "_", "{}_{}".format(meth.__name__.lstrip("_"), "_".join(str(x) for x in args)))
- setattr(cls, name, lambda self: meth(self, *args, **kw))
- def make_test_client(self, person, sso_username=None, **kw):
- """
- Instantiate a test client, logging in the given person.
- If person is None, visit anonymously. If person is None but
- sso_username is not None, authenticate as the given sso_username even
- if a Person record does not exist.
- """
- person = self.persons[person]
- if person is not None:
- kw["SSL_CLIENT_S_DN_CN"] = person.username
- elif sso_username is not None:
- kw["SSL_CLIENT_S_DN_CN"] = sso_username
- client = Client(**kw)
- client.visitor = person
- return client
- def assertPermissionDenied(self, response):
- if response.status_code == 403:
- pass
- else:
- self.fail("response has status code {} instead of a 403 Forbidden".format(response.status_code))
- def assertRedirectMatches(self, response, target):
- if response.status_code != 302:
- self.fail("response has status code {} instead of a Redirect".format(response.status_code))
- if target and not re.search(target, response["Location"]):
- self.fail("response redirects to {} which does not match {}".format(response["Location"], target))
- def assertFormErrorMatches(self, response, form_name, field_name, regex):
- form = response.context[form_name]
- errors = form.errors
- if not errors: self.fail("Form {} has no errors".format(form_name))
- if field_name not in errors: self.fail("Form {} has no errors in field {}".format(form_name, field_name))
- match = re.compile(regex)
- for errmsg in errors[field_name]:
- if match.search(errmsg): return
- self.fail("{} dit not match any in {}".format(regex, repr(errors)))
- def assertContainsElements(self, response, elements, *names):
- """
- Check that the response contains only the elements in `names` from PageElements `elements`
- """
- want = set(names)
- extras = want - set(elements.keys())
- if extras: raise RuntimeError("Wanted elements not found in the list of possible ones: {}".format(", ".join(extras)))
- should_have = []
- should_not_have = []
- content = response.content.decode("utf-8")
- for name, regex in elements.items():
- if name in want:
- if not regex.search(content):
- should_have.append(name)
- else:
- if regex.search(content):
- should_not_have.append(name)
- if should_have or should_not_have:
- msg = []
- if should_have: msg.append("should have element(s) {}".format(", ".join(should_have)))
- if should_not_have: msg.append("should not have element(s) {}".format(", ".join(should_not_have)))
- self.fail("page " + " and ".join(msg))
- class BaseFixtureMixin(TestBase):
- @classmethod
- def get_persons_defaults(cls):
- """
- Get default arguments for test persons
- """
- return {}
- @classmethod
- def setUpClass(cls):
- super(BaseFixtureMixin, cls).setUpClass()
- cls.persons = TestPersons(**cls.get_persons_defaults())
- cls.ams = NamedObjects(AM)
- cls.fingerprints = NamedObjects(Fingerprint)
- cls.keys = TestKeys()
- # Preload keys
- cls.keys.create("66B4DFB68CB24EBBD8650BC4F4B4B0CC797EBFAB")
- cls.keys.create("1793D6AB75663E6BF104953A634F4BD1E7AD5568")
- cls.keys.create("0EED77DC41D760FDE44035FF5556A34E04A3610B")
- @classmethod
- def tearDownClass(cls):
- cls.keys.delete_all()
- cls.ams.delete_all()
- cls.fingerprints.delete_all()
- cls.persons.delete_all()
- super(BaseFixtureMixin, cls).tearDownClass()
- def setUp(self):
- super(BaseFixtureMixin, self).setUp()
- self.persons.refresh();
- self.ams.refresh();
- self.fingerprints.refresh();
- self.keys.refresh();
- class PersonFixtureMixin(BaseFixtureMixin):
- """
- Pre-create some persons
- """
- @classmethod
- def setUpClass(cls):
- super(PersonFixtureMixin, cls).setUpClass()
- # pending account
- cls.persons.create("pending", status=const.STATUS_DC, expires=now() + datetime.timedelta(days=1), pending="12345", alioth=True)
- # debian contributor
- cls.persons.create("dc", status=const.STATUS_DC, alioth=True)
- # debian contributor with guest account
- cls.persons.create("dc_ga", status=const.STATUS_DC_GA, alioth=True)
- # dm
- cls.persons.create("dm", status=const.STATUS_DM, alioth=True)
- # dm with guest account
- cls.persons.create("dm_ga", status=const.STATUS_DM_GA, alioth=True)
- # dd, nonuploading
- cls.persons.create("dd_nu", status=const.STATUS_DD_NU)
- # dd, uploading
- cls.persons.create("dd_u", status=const.STATUS_DD_U)
- # dd, emeritus
- cls.persons.create("dd_e", status=const.STATUS_EMERITUS_DD)
- # dd, removed
- cls.persons.create("dd_r", status=const.STATUS_REMOVED_DD)
- # unrelated active am
- fd = cls.persons.create("activeam", status=const.STATUS_DD_NU)
- cls.ams.create("activeam", person=fd)
- # fd
- fd = cls.persons.create("fd", status=const.STATUS_DD_NU)
- cls.ams.create("fd", person=fd, is_fd=True)
- # dam
- dam = cls.persons.create("dam", status=const.STATUS_DD_U)
- cls.ams.create("dam", person=dam, is_fd=True, is_dam=True)
- class TestSet(set):
- """
- Set of strings that can be initialized from space-separated strings, and
- changed with simple text patches.
- """
- def __init__(self, initial=""):
- if initial: self.update(initial.split())
- def set(self, vals):
- self.clear()
- self.update(vals.split())
- def patch(self, diff):
- for change in diff.split():
- if change[0] == "+":
- self.add(change[1:])
- elif change[0] == "-":
- self.discard(change[1:])
- else:
- raise RuntimeError("Changes {} contain {} that is nether an add nor a remove".format(repr(text), repr(change)))
- def clone(self):
- res = TestSet()
- res.update(self)
- return res
- class PatchExact(object):
- def __init__(self, text):
- if text:
- self.items = set(text.split())
- else:
- self.items = set()
- def apply(self, cur):
- if self.items: return set(self.items)
- return None
- class PatchDiff(object):
- def __init__(self, text):
- self.added = set()
- self.removed = set()
- for change in text.split():
- if change[0] == "+":
- self.added.add(change[1:])
- elif change[0] == "-":
- self.removed.add(change[1:])
- else:
- raise RuntimeError("Changes {} contain {} that is nether an add nor a remove".format(text, change))
- def apply(self, cur):
- if cur is None:
- cur = set(self.added)
- else:
- cur = (cur - self.removed) | self.added
- if not cur: return None
- return cur
- class ExpectedSets(defaultdict):
- """
- Store the permissions expected out of a *VisitorPermissions object
- """
- def __init__(self, testcase, action_msg="{visitor}", issue_msg="{problem} {mismatch}"):
- super(ExpectedSets, self).__init__(TestSet)
- self.testcase = testcase
- self.action_msg = action_msg
- self.issue_msg = issue_msg
- @property
- def visitors(self):
- return self.keys()
- def set(self, visitors, text):
- for v in visitors.split():
- self[v].set(text)
- def patch(self, visitors, text):
- for v in visitors.split():
- self[v].patch(text)
- def select_others(self, persons):
- other_visitors = set(persons.keys())
- other_visitors.add(None)
- other_visitors -= set(self.keys())
- return other_visitors
- def combine(self, other):
- res = ExpectedSets(self.testcase, action_msg=self.action_msg, issue_msg=self.issue_msg)
- for k, v in self.items():
- res[k] = v.clone()
- for k, v in other.items():
- res[k].update(v)
- return res
- def assertEqual(self, visitor, got):
- got = set(got)
- wanted = self.get(visitor, set())
- if got == wanted: return
- extra = got - wanted
- missing = wanted - got
- msgs = []
- if missing: msgs.append(self.issue_msg.format(problem="misses", mismatch=", ".join(sorted(missing))))
- if extra: msgs.append(self.issue_msg.format(problem="has extra", mismatch=", ".join(sorted(extra))))
- self.testcase.fail(self.action_msg.format(visitor=visitor) + " " + " and ".join(msgs))
- def assertEmpty(self, visitor, got):
- extra = set(got)
- if not extra: return
- self.testcase.fail(self.action_msg.format(visitor=visitor) + " " + self.issue_msg.format(problem="has", mismatch=", ".join(sorted(extra))))
- def assertMatches(self, visited):
- for visitor in self.visitors:
- visit_perms = visited.permissions_of(self.testcase.persons[visitor])
- self.assertEqual(visitor, visit_perms)
- for visitor in self.select_others(self.testcase.persons):
- visit_perms = visited.permissions_of(self.testcase.persons[visitor] if visitor else None)
- self.assertEmpty(visitor, visit_perms)
- class ExpectedPerms(object):
- """
- Store the permissions expected out of a *VisitorPermissions object
- """
- def __init__(self, perms={}):
- self.perms = {}
- for visitors, expected_perms in perms.items():
- for visitor in visitors.split():
- self.perms[visitor] = set(expected_perms.split())
- def _apply_diff(self, d, diff):
- for visitors, change in diff.items():
- for visitor in visitors.split():
- cur = change.apply(d.get(visitor, None))
- if not cur:
- d.pop(visitor, None)
- else:
- d[visitor] = cur
- def update_perms(self, diff):
- self._apply_diff(self.perms, diff)
- def set_perms(self, visitors, text):
- self.update_perms({ visitors: PatchExact(text) })
- def patch_perms(self, visitors, text):
- self.update_perms({ visitors: PatchDiff(text) })
- class PageElements(dict):
- """
- List of all page elements possibly expected in the results of a view.
- dict matching name used to refer to the element with regexp matching the
- element.
- """
- def add_id(self, id):
- self[id] = re.compile(r"""id\s*=\s*["']{}["']""".format(re.escape(id)))
- def add_class(self, cls):
- self[cls] = re.compile(r"""class\s*=\s*["']{}["']""".format(re.escape(cls)))
- def add_href(self, name, url):
- self[name] = re.compile(r"""href\s*=\s*["']{}["']""".format(re.escape(url)))
- def add_string(self, name, term):
- self[name] = re.compile(r"""{}""".format(re.escape(term)))
- def clone(self):
- res = PageElements()
- res.update(self.items())
- return res
- class TestOldProcesses(NamedObjects):
- def __init__(self, **defaults):
- super(TestOldProcesses, self).__init__(bmodels.Process, **defaults)
- defaults.setdefault("progress", const.PROGRESS_APP_NEW)
- def create(self, _name, advocates=[], **kw):
- self._update_kwargs_with_defaults(_name, kw)
- if "process" in kw:
- kw.setdefault("is_active", kw["process"] not in (const.PROGRESS_DONE, const.PROGRESS_CANCELLED))
- else:
- kw.setdefault("is_active", True)
- if "manager" in kw:
- try:
- am = kw["manager"].am
- except bmodels.AM.DoesNotExist:
- am = bmodels.AM.objects.create(person=kw["manager"])
- kw["manager"] = am
- self[_name] = o = self._model.objects.create(**kw)
- for a in advocates:
- o.advocates.add(a)
- return o
- class OldProcessFixtureMixin(PersonFixtureMixin):
- @classmethod
- def get_processes_defaults(cls):
- """
- Get default arguments for test processes
- """
- return {}
- @classmethod
- def setUpClass(cls):
- super(OldProcessFixtureMixin, cls).setUpClass()
- cls.processes = TestOldProcesses(**cls.get_processes_defaults())
- @classmethod
- def tearDownClass(cls):
- cls.processes.delete_all()
- super(OldProcessFixtureMixin, cls).tearDownClass()
- def setUp(self):
- super(OldProcessFixtureMixin, self).setUp()
- self.processes.refresh();
|