123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545 |
- # -*- coding: utf-8; -*-
- #
- # test/helper.py
- # Part of ‘dput’, a Debian package upload toolkit.
- #
- # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
- #
- # This is free software: you may copy, modify, and/or distribute this work
- # under the terms of the GNU General Public License as published by the
- # Free Software Foundation; version 3 of that license or any later version.
- # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
- """ Helper functionality for Dput test suite. """
- from __future__ import (absolute_import, unicode_literals)
- import sys
- if sys.version_info >= (3, 3):
- import builtins
- import unittest
- import unittest.mock as mock
- from io import StringIO as StringIO
- import configparser
- import collections.abc as collections_abc
- elif sys.version_info >= (3, 0):
- raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
- elif sys.version_info >= (2, 7):
- # Python 2 standard library.
- import __builtin__ as builtins
- # Third-party backport of Python 3 unittest improvements.
- import unittest2 as unittest
- # Third-party mock library.
- import mock
- # Python 2 standard library.
- from StringIO import StringIO as BaseStringIO
- import ConfigParser as configparser
- import collections as collections_abc
- else:
- raise RuntimeError("Python earlier than 2.7 is not supported.")
- import os
- import os.path
- import shutil
- import tempfile
- import pwd
- import errno
- import time
- import signal
- import subprocess
- import functools
- import itertools
- import base64
- import collections
- import weakref
- import shlex
- __package__ = str("test")
- __import__(__package__)
- __metaclass__ = type
- def make_unique_slug(testcase):
- """ Make a unique slug for the test case. """
- text = base64.b64encode(
- testcase.getUniqueString().encode('utf-8')
- ).decode('utf-8')
- result = text[-30:]
- return result
- try:
- StringIO
- except NameError:
- # We don't yet have the StringIO we want. Create it.
- class StringIO(BaseStringIO, object):
- """ StringIO with a context manager. """
- def __enter__(self):
- return self
- def __exit__(self, *args):
- self.close()
- return False
- def patch_stdout(testcase):
- """ Patch `sys.stdout` for the specified test case. """
- patcher = mock.patch.object(
- sys, 'stdout', wraps=StringIO())
- patcher.start()
- testcase.addCleanup(patcher.stop)
- def patch_stderr(testcase):
- """ Patch `sys.stderr` for the specified test case. """
- patcher = mock.patch.object(
- sys, 'stderr', wraps=StringIO())
- patcher.start()
- testcase.addCleanup(patcher.stop)
- def patch_signal_signal(testcase):
- """ Patch `signal.signal` for the specified test case. """
- func_patcher = mock.patch.object(signal, 'signal')
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- class FakeSystemExit(Exception):
- """ Fake double for `SystemExit` exception. """
- EXIT_STATUS_SUCCESS = 0
- EXIT_STATUS_FAILURE = 1
- EXIT_STATUS_COMMAND_NOT_FOUND = 127
- def patch_sys_exit(testcase):
- """ Patch `sys.exit` for the specified test case. """
- func_patcher = mock.patch.object(
- sys, 'exit',
- side_effect=FakeSystemExit())
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_sys_argv(testcase):
- """ Patch the `sys.argv` sequence for the test case. """
- if not hasattr(testcase, 'progname'):
- testcase.progname = make_unique_slug(testcase)
- if not hasattr(testcase, 'sys_argv'):
- testcase.sys_argv = [testcase.progname]
- patcher = mock.patch.object(sys, "argv", new=list(testcase.sys_argv))
- patcher.start()
- testcase.addCleanup(patcher.stop)
- def patch_system_interfaces(testcase):
- """ Patch system interfaces that are disruptive to the test runner. """
- patch_stdout(testcase)
- patch_stderr(testcase)
- patch_sys_exit(testcase)
- patch_sys_argv(testcase)
- def patch_time_time(testcase, values=None):
- """ Patch the `time.time` function for the specified test case.
- :param testcase: The `TestCase` instance for binding to the patch.
- :param values: An iterable to provide return values.
- :return: None.
- """
- if values is None:
- values = itertools.count()
- def generator_fake_time():
- while True:
- yield next(values)
- func_patcher = mock.patch.object(time, "time")
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- time.time.side_effect = generator_fake_time()
- def patch_os_environ(testcase):
- """ Patch the `os.environ` mapping. """
- if not hasattr(testcase, 'os_environ'):
- testcase.os_environ = {}
- patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
- patcher.start()
- testcase.addCleanup(patcher.stop)
- def patch_os_getpid(testcase):
- """ Patch `os.getpid` for the specified test case. """
- func_patcher = mock.patch.object(os, 'getpid')
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_os_getuid(testcase):
- """ Patch the `os.getuid` function. """
- if not hasattr(testcase, 'os_getuid_return_value'):
- testcase.os_getuid_return_value = testcase.getUniqueInteger()
- func_patcher = mock.patch.object(
- os, "getuid", return_value=testcase.os_getuid_return_value)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- PasswdEntry = collections.namedtuple(
- "PasswdEntry",
- "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
- def patch_pwd_getpwuid(testcase):
- """ Patch the `pwd.getpwuid` function. """
- if not hasattr(testcase, 'pwd_getpwuid_return_value'):
- testcase.pwd_getpwuid_return_value = PasswdEntry(
- pw_name=make_unique_slug(testcase),
- pw_passwd=make_unique_slug(testcase),
- pw_uid=testcase.getUniqueInteger(),
- pw_gid=testcase.getUniqueInteger(),
- pw_gecos=testcase.getUniqueString(),
- pw_dir=tempfile.mktemp(),
- pw_shell=tempfile.mktemp())
- if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
- pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
- else:
- pwent = testcase.pwd_getpwuid_return_value
- func_patcher = mock.patch.object(pwd, "getpwuid", return_value=pwent)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_os_path_exists(testcase):
- """ Patch `os.path.exists` behaviour for this test case.
- When the patched function is called, the registry of
- `FileDouble` instances for this test case will be used to get
- the instance for the path specified.
- """
- orig_os_path_exists = os.path.exists
- def fake_os_path_exists(path):
- registry = FileDouble.get_registry_for_testcase(testcase)
- if path in registry:
- file_double = registry[path]
- result = file_double.os_path_exists_scenario.call_hook()
- else:
- result = orig_os_path_exists(path)
- return result
- func_patcher = mock.patch.object(
- os.path, 'exists', side_effect=fake_os_path_exists)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_os_access(testcase):
- """ Patch `os.access` behaviour for this test case.
- When the patched function is called, the registry of
- `FileDouble` instances for this test case will be used to get
- the instance for the path specified.
- """
- orig_os_access = os.access
- def fake_os_access(path, mode):
- registry = FileDouble.get_registry_for_testcase(testcase)
- if path in registry:
- file_double = registry[path]
- result = file_double.os_access_scenario.call_hook(mode)
- else:
- result = orig_os_access(path, mode)
- return result
- func_patcher = mock.patch.object(
- os, 'access', side_effect=fake_os_access)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- StatResult = collections.namedtuple(
- 'StatResult', [
- 'st_mode',
- 'st_ino', 'st_dev', 'st_nlink',
- 'st_uid', 'st_gid',
- 'st_size',
- 'st_atime', 'st_mtime', 'st_ctime',
- ])
- def patch_os_stat(testcase):
- """ Patch `os.stat` behaviour for this test case.
- When the patched function is called, the registry of
- `FileDouble` instances for this test case will be used to get
- the instance for the path specified.
- """
- orig_os_stat = os.stat
- def fake_os_stat(path):
- registry = FileDouble.get_registry_for_testcase(testcase)
- if path in registry:
- file_double = registry[path]
- result = file_double.os_stat_scenario.call_hook()
- else:
- result = orig_os_stat(path)
- return result
- func_patcher = mock.patch.object(
- os, 'stat', side_effect=fake_os_stat)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_os_lstat(testcase):
- """ Patch `os.lstat` behaviour for this test case.
- When the patched function is called, the registry of
- `FileDouble` instances for this test case will be used to get
- the instance for the path specified.
- """
- orig_os_lstat = os.lstat
- def fake_os_lstat(path):
- registry = FileDouble.get_registry_for_testcase(testcase)
- if path in registry:
- file_double = registry[path]
- result = file_double.os_lstat_scenario.call_hook()
- else:
- result = orig_os_lstat(path)
- return result
- func_patcher = mock.patch.object(
- os, 'lstat', side_effect=fake_os_lstat)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_os_unlink(testcase):
- """ Patch `os.unlink` behaviour for this test case.
- When the patched function is called, the registry of
- `FileDouble` instances for this test case will be used to get
- the instance for the path specified.
- """
- orig_os_unlink = os.unlink
- def fake_os_unlink(path):
- registry = FileDouble.get_registry_for_testcase(testcase)
- if path in registry:
- file_double = registry[path]
- result = file_double.os_unlink_scenario.call_hook()
- else:
- result = orig_os_unlink(path)
- return result
- func_patcher = mock.patch.object(
- os, 'unlink', side_effect=fake_os_unlink)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_os_rmdir(testcase):
- """ Patch `os.rmdir` behaviour for this test case.
- When the patched function is called, the registry of
- `FileDouble` instances for this test case will be used to get
- the instance for the path specified.
- """
- orig_os_rmdir = os.rmdir
- def fake_os_rmdir(path):
- registry = FileDouble.get_registry_for_testcase(testcase)
- if path in registry:
- file_double = registry[path]
- result = file_double.os_rmdir_scenario.call_hook()
- else:
- result = orig_os_rmdir(path)
- return result
- func_patcher = mock.patch.object(
- os, 'rmdir', side_effect=fake_os_rmdir)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_shutil_rmtree(testcase):
- """ Patch `shutil.rmtree` behaviour for this test case.
- When the patched function is called, the registry of
- `FileDouble` instances for this test case will be used to get
- the instance for the path specified.
- """
- orig_shutil_rmtree = os.rmdir
- def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
- registry = FileDouble.get_registry_for_testcase(testcase)
- if path in registry:
- file_double = registry[path]
- result = file_double.shutil_rmtree_scenario.call_hook()
- else:
- result = orig_shutil_rmtree(path)
- return result
- func_patcher = mock.patch.object(
- shutil, 'rmtree', side_effect=fake_shutil_rmtree)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_tempfile_mkdtemp(testcase):
- """ Patch the `tempfile.mkdtemp` function for this test case. """
- if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
- testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
- double = testcase.tempfile_mkdtemp_file_double
- double.set_os_unlink_scenario('okay')
- double.set_os_rmdir_scenario('okay')
- double.register_for_testcase(testcase)
- func_patcher = mock.patch.object(tempfile, "mkdtemp")
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
- try:
- FileNotFoundError
- FileExistsError
- PermissionError
- except NameError:
- # Python 2 uses IOError.
- def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
- result_kwargs = init_kwargs
- result_errno = errno_value
- result_strerror = os.strerror(errno_value)
- result_filename = None
- if len(init_args) >= 3:
- result_errno = init_args[0]
- result_filename = init_args[2]
- if 'errno' in init_kwargs:
- result_errno = init_kwargs['errno']
- del result_kwargs['errno']
- if 'filename' in init_kwargs:
- result_filename = init_kwargs['filename']
- del result_kwargs['filename']
- if len(init_args) >= 2:
- result_strerror = init_args[1]
- if 'strerror' in init_kwargs:
- result_strerror = init_kwargs['strerror']
- del result_kwargs['strerror']
- result_args = (result_errno, result_strerror, result_filename)
- return (result_args, result_kwargs)
- class FileNotFoundError(IOError):
- def __init__(self, *args, **kwargs):
- (args, kwargs) = _ensure_ioerror_args(
- args, kwargs, errno_value=errno.ENOENT)
- super(FileNotFoundError, self).__init__(*args, **kwargs)
- class FileExistsError(IOError):
- def __init__(self, *args, **kwargs):
- (args, kwargs) = _ensure_ioerror_args(
- args, kwargs, errno_value=errno.EEXIST)
- super(FileExistsError, self).__init__(*args, **kwargs)
- class PermissionError(IOError):
- def __init__(self, *args, **kwargs):
- (args, kwargs) = _ensure_ioerror_args(
- args, kwargs, errno_value=errno.EPERM)
- super(PermissionError, self).__init__(*args, **kwargs)
- def make_fake_file_scenarios(path=None):
- """ Make a collection of scenarios for testing with fake files.
- :path: The filesystem path of the fake file. If not specified,
- a valid random path will be generated.
- :return: A collection of scenarios for tests involving input files.
- The collection is a mapping from scenario name to a dictionary of
- scenario attributes.
- """
- if path is None:
- file_path = tempfile.mktemp()
- else:
- file_path = path
- fake_file_empty = StringIO()
- fake_file_minimal = StringIO("Lorem ipsum.")
- fake_file_large = StringIO("\n".join(
- "ABCDEFGH" * 100
- for __ in range(1000)))
- default_scenario_params = {
- 'open_scenario_name': 'okay',
- 'file_double_params': dict(
- path=file_path, fake_file=fake_file_minimal),
- }
- scenarios = {
- 'default': {},
- 'error-not-exist': {
- 'open_scenario_name': 'nonexist',
- },
- 'error-exist': {
- 'open_scenario_name': 'exist_error',
- },
- 'error-read-denied': {
- 'open_scenario_name': 'read_denied',
- },
- 'not-found': {
- 'file_double_params': dict(
- path=file_path, fake_file=fake_file_empty),
- },
- 'exist-empty': {
- 'file_double_params': dict(
- path=file_path, fake_file=fake_file_empty),
- },
- 'exist-minimal': {
- 'file_double_params': dict(
- path=file_path, fake_file=fake_file_minimal),
- },
- 'exist-large': {
- 'file_double_params': dict(
- path=file_path, fake_file=fake_file_large),
- },
- }
- for (name, scenario) in scenarios.items():
- params = default_scenario_params.copy()
- params.update(scenario)
- scenario.update(params)
- scenario['file_double'] = FileDouble(**scenario['file_double_params'])
- scenario['file_double'].set_open_scenario(params['open_scenario_name'])
- scenario['fake_file_scenario_name'] = name
- return scenarios
- def get_file_doubles_from_fake_file_scenarios(scenarios):
- """ Get the `FileDouble` instances from fake file scenarios.
- :param scenarios: Collection of fake file scenarios.
- :return: Collection of `FileDouble` instances.
- """
- doubles = set(
- scenario['file_double']
- for scenario in scenarios
- if scenario['file_double'] is not None)
- return doubles
- def setup_file_double_behaviour(testcase, doubles=None):
- """ Set up file double instances and behaviour.
- :param testcase: The `TestCase` instance to modify.
- :param doubles: Collection of `FileDouble` instances.
- :return: None.
- If `doubles` is ``None``, a default collection will be made
- from the result of `make_fake_file_scenarios` result.
- """
- if doubles is None:
- scenarios = make_fake_file_scenarios()
- doubles = get_file_doubles_from_fake_file_scenarios(
- scenarios.values())
- for file_double in doubles:
- file_double.register_for_testcase(testcase)
- orig_open = builtins.open
- def fake_open(path, mode='rt', buffering=-1):
- registry = FileDouble.get_registry_for_testcase(testcase)
- if path in registry:
- file_double = registry[path]
- result = file_double.builtins_open_scenario.call_hook(
- mode, buffering)
- else:
- result = orig_open(path, mode, buffering)
- return result
- mock_open = mock.mock_open()
- mock_open.side_effect = fake_open
- func_patcher = mock.patch.object(
- builtins, "open",
- new=mock_open)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def setup_fake_file_fixtures(testcase):
- """ Set up fixtures for fake file doubles.
- :param testcase: The `TestCase` instance to modify.
- :return: None.
- """
- scenarios = make_fake_file_scenarios()
- testcase.fake_file_scenarios = scenarios
- file_doubles = get_file_doubles_from_fake_file_scenarios(
- scenarios.values())
- setup_file_double_behaviour(testcase, file_doubles)
- def set_fake_file_scenario(testcase, name):
- """ Set the named fake file scenario for the test case. """
- scenario = testcase.fake_file_scenarios[name]
- testcase.fake_file_scenario = scenario
- testcase.file_double = scenario['file_double']
- testcase.file_double.register_for_testcase(testcase)
- class TestDoubleFunctionScenario:
- """ Scenario for fake behaviour of a specific function. """
- def __init__(self, scenario_name, double):
- self.scenario_name = scenario_name
- self.double = double
- self.call_hook = getattr(
- self, "_hook_{name}".format(name=self.scenario_name))
- def __repr__(self):
- text = (
- "<{class_name} instance: {id}"
- " name: {name!r},"
- " call_hook name: {hook_name!r}"
- " double: {double!r}"
- ">").format(
- class_name=self.__class__.__name__, id=id(self),
- name=self.scenario_name, double=self.double,
- hook_name=self.call_hook.__name__)
- return text
- def __eq__(self, other):
- result = True
- if not self.scenario_name == other.scenario_name:
- result = False
- if not self.double == other.double:
- result = False
- if not self.call_hook.__name__ == other.call_hook.__name__:
- result = False
- return result
- def __ne__(self, other):
- result = not self.__eq__(other)
- return result
- class os_path_exists_scenario(TestDoubleFunctionScenario):
- """ Scenario for `os.path.exists` behaviour. """
- def _hook_exist(self):
- return True
- def _hook_not_exist(self):
- return False
- class os_access_scenario(TestDoubleFunctionScenario):
- """ Scenario for `os.access` behaviour. """
- def _hook_okay(self, mode):
- return True
- def _hook_not_exist(self, mode):
- return False
- def _hook_read_only(self, mode):
- if mode & (os.W_OK | os.X_OK):
- result = False
- else:
- result = True
- return result
- def _hook_denied(self, mode):
- if mode & (os.R_OK | os.W_OK | os.X_OK):
- result = False
- else:
- result = True
- return result
- class os_stat_scenario(TestDoubleFunctionScenario):
- """ Scenario for `os.stat` behaviour. """
- def _hook_okay(self):
- return self.double.stat_result
- def _hook_notfound_error(self):
- raise FileNotFoundError(
- self.double.path,
- "No such file or directory: {path!r}".format(
- path=self.double.path))
- def _hook_denied_error(self):
- raise PermissionError(
- self.double.path,
- "Permission denied")
- class os_lstat_scenario(os_stat_scenario):
- """ Scenario for `os.lstat` behaviour. """
- class os_unlink_scenario(TestDoubleFunctionScenario):
- """ Scenario for `os.unlink` behaviour. """
- def _hook_okay(self):
- return None
- def _hook_nonexist(self):
- error = FileNotFoundError(
- self.double.path,
- "No such file or directory: {path!r}".format(
- path=self.double.path))
- raise error
- def _hook_denied(self):
- error = PermissionError(
- self.double.path,
- "Permission denied")
- raise error
- class os_rmdir_scenario(TestDoubleFunctionScenario):
- """ Scenario for `os.rmdir` behaviour. """
- def _hook_okay(self):
- return None
- def _hook_nonexist(self):
- error = FileNotFoundError(
- self.double.path,
- "No such file or directory: {path!r}".format(
- path=self.double.path))
- raise error
- def _hook_denied(self):
- error = PermissionError(
- self.double.path,
- "Permission denied")
- raise error
- class shutil_rmtree_scenario(TestDoubleFunctionScenario):
- """ Scenario for `shutil.rmtree` behaviour. """
- def _hook_okay(self):
- return None
- def _hook_nonexist(self):
- error = FileNotFoundError(
- self.double.path,
- "No such file or directory: {path!r}".format(
- path=self.double.path))
- raise error
- def _hook_denied(self):
- error = PermissionError(
- self.double.path,
- "Permission denied")
- raise error
- class builtins_open_scenario(TestDoubleFunctionScenario):
- """ Scenario for `builtins.open` behaviour. """
- def _hook_okay(self, mode, buffering):
- result = self.double.fake_file
- return result
- def _hook_nonexist(self, mode, buffering):
- if mode.startswith('r'):
- error = FileNotFoundError(
- self.double.path,
- "No such file or directory: {path!r}".format(
- path=self.double.path))
- raise error
- result = self.double.fake_file
- return result
- def _hook_exist_error(self, mode, buffering):
- if mode.startswith('w') or mode.startswith('a'):
- error = FileExistsError(
- self.double.path,
- "File already exists: {path!r}".format(
- path=self.double.path))
- raise error
- result = self.double.fake_file
- return result
- def _hook_read_denied(self, mode, buffering):
- if mode.startswith('r'):
- error = PermissionError(
- self.double.path,
- "Read denied on {path!r}".format(
- path=self.double.path))
- raise error
- result = self.double.fake_file
- return result
- def _hook_write_denied(self, mode, buffering):
- if mode.startswith('w') or mode.startswith('a'):
- error = PermissionError(
- self.double.path,
- "Write denied on {path!r}".format(
- path=self.double.path))
- raise error
- result = self.double.fake_file
- return result
- class TestDoubleWithRegistry:
- """ Abstract base class for a test double with a test case registry. """
- registry_class = NotImplemented
- registries = NotImplemented
- function_scenario_params_by_class = NotImplemented
- def __new__(cls, *args, **kwargs):
- superclass = super(TestDoubleWithRegistry, cls)
- if superclass.__new__ is object.__new__:
- # The ‘object’ implementation complains about extra arguments.
- instance = superclass.__new__(cls)
- else:
- instance = superclass.__new__(cls, *args, **kwargs)
- instance.make_set_scenario_methods()
- return instance
- def __init__(self, *args, **kwargs):
- super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
- self._set_method_per_scenario()
- def _make_set_scenario_method(self, scenario_class, params):
- def method(self, name):
- scenario = scenario_class(name, double=self)
- setattr(self, scenario_class.__name__, scenario)
- method.__doc__ = (
- """ Set the scenario for `{name}` behaviour. """
- ).format(name=scenario_class.__name__)
- method.__name__ = str(params['set_scenario_method_name'])
- return method
- def make_set_scenario_methods(self):
- """ Make `set_<scenario_class_name>` methods on this class. """
- for (function_scenario_class, function_scenario_params) in (
- self.function_scenario_params_by_class.items()):
- method = self._make_set_scenario_method(
- function_scenario_class, function_scenario_params)
- setattr(self.__class__, method.__name__, method)
- function_scenario_params['set_scenario_method'] = method
- def _set_method_per_scenario(self):
- """ Set the method to be called for each scenario. """
- for function_scenario_params in (
- self.function_scenario_params_by_class.values()):
- function_scenario_params['set_scenario_method'](
- self, function_scenario_params['default_scenario_name'])
- @classmethod
- def get_registry_for_testcase(cls, testcase):
- """ Get the FileDouble registry for the specified test case. """
- # Key in a dict must be hashable.
- key = (testcase.__class__, id(testcase))
- registry = cls.registries.setdefault(key, cls.registry_class())
- return registry
- def get_registry_key(self):
- """ Get the registry key for this double. """
- raise NotImplementedError
- def register_for_testcase(self, testcase):
- """ Add this instance to registry for the specified testcase. """
- registry = self.get_registry_for_testcase(testcase)
- key = self.get_registry_key()
- registry[key] = self
- unregister_func = functools.partial(
- self.unregister_for_testcase, testcase)
- testcase.addCleanup(unregister_func)
- def unregister_for_testcase(self, testcase):
- """ Remove this instance from registry for the specified testcase. """
- registry = self.get_registry_for_testcase(testcase)
- key = self.get_registry_key()
- if key in registry:
- registry.pop(key)
- def copy_fake_file(fake_file):
- """ Make a copy of the StringIO instance. """
- fake_file_type = StringIO
- content = ""
- if fake_file is not None:
- fake_file_type = type(fake_file)
- content = fake_file.getvalue()
- assert issubclass(fake_file_type, object)
- result = fake_file_type(content)
- return result
- class FileDouble(TestDoubleWithRegistry):
- """ A testing double for a file. """
- registry_class = dict
- registries = {}
- function_scenario_params_by_class = {
- os_path_exists_scenario: {
- 'default_scenario_name': 'not_exist',
- 'set_scenario_method_name': 'set_os_path_exists_scenario',
- },
- os_access_scenario: {
- 'default_scenario_name': 'okay',
- 'set_scenario_method_name': 'set_os_access_scenario',
- },
- os_stat_scenario: {
- 'default_scenario_name': 'okay',
- 'set_scenario_method_name': 'set_os_stat_scenario',
- },
- os_lstat_scenario: {
- 'default_scenario_name': 'okay',
- 'set_scenario_method_name': 'set_os_lstat_scenario',
- },
- builtins_open_scenario: {
- 'default_scenario_name': 'okay',
- 'set_scenario_method_name': 'set_open_scenario',
- },
- os_unlink_scenario: {
- 'default_scenario_name': 'okay',
- 'set_scenario_method_name': 'set_os_unlink_scenario',
- },
- os_rmdir_scenario: {
- 'default_scenario_name': 'okay',
- 'set_scenario_method_name': 'set_os_rmdir_scenario',
- },
- shutil_rmtree_scenario: {
- 'default_scenario_name': 'okay',
- 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
- },
- }
- def __init__(self, path=None, fake_file=None, *args, **kwargs):
- self.path = path
- self.fake_file = copy_fake_file(fake_file)
- self.fake_file.name = path
- self._set_stat_result()
- super(FileDouble, self).__init__(*args, **kwargs)
- def _set_stat_result(self):
- """ Set the `os.stat` result for this file. """
- size = len(self.fake_file.getvalue())
- self.stat_result = StatResult(
- st_mode=0,
- st_ino=None, st_dev=None, st_nlink=None,
- st_uid=0, st_gid=0,
- st_size=size,
- st_atime=None, st_mtime=None, st_ctime=None,
- )
- def __repr__(self):
- text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
- path=self.path, fake_file=self.fake_file)
- return text
- def get_registry_key(self):
- """ Get the registry key for this double. """
- result = self.path
- return result
- class os_popen_scenario(TestDoubleFunctionScenario):
- """ Scenario for `os.popen` behaviour. """
- stream_name_by_mode = {
- 'w': 'stdin',
- 'r': 'stdout',
- }
- def _hook_success(self, cmd, mode, buffering):
- stream_name = self.stream_name_by_mode[mode]
- stream_double = getattr(
- self.double, stream_name + '_double')
- result = stream_double.fake_file
- return result
- def _hook_failure(self, cmd, mode, buffering):
- result = StringIO()
- return result
- def _hook_not_found(self, cmd, mode, buffering):
- result = StringIO()
- return result
- class os_waitpid_scenario(TestDoubleFunctionScenario):
- """ Scenario for `os.waitpid` behaviour. """
- def _hook_success(self, pid, options):
- result = (pid, EXIT_STATUS_SUCCESS)
- return result
- def _hook_failure(self, pid, options):
- result = (pid, EXIT_STATUS_FAILURE)
- return result
- def _hook_not_found(self, pid, options):
- error = OSError(errno.ECHILD)
- raise error
- class os_system_scenario(TestDoubleFunctionScenario):
- """ Scenario for `os.system` behaviour. """
- def _hook_success(self, command):
- result = EXIT_STATUS_SUCCESS
- return result
- def _hook_failure(self, command):
- result = EXIT_STATUS_FAILURE
- return result
- def _hook_not_found(self, command):
- result = EXIT_STATUS_COMMAND_NOT_FOUND
- return result
- class os_spawnv_scenario(TestDoubleFunctionScenario):
- """ Scenario for `os.spawnv` behaviour. """
- def _hook_success(self, mode, file, args):
- result = EXIT_STATUS_SUCCESS
- return result
- def _hook_failure(self, mode, file, args):
- result = EXIT_STATUS_FAILURE
- return result
- def _hook_not_found(self, mode, file, args):
- result = EXIT_STATUS_COMMAND_NOT_FOUND
- return result
- ARG_ANY = object()
- ARG_MORE = object()
- class PopenDouble:
- """ A testing double for `subprocess.Popen`. """
- def __init__(self, args, *posargs, **kwargs):
- self.stdin = None
- self.stdout = None
- self.stderr = None
- self.pid = None
- self.returncode = None
- def set_streams(self, subprocess_double, popen_kwargs):
- """ Set the streams on the `PopenDouble`.
- :param subprocess_double: The `SubprocessDouble` from
- which to get existing stream doubles.
- :param popen_kwargs: The keyword arguments to the
- `subprocess.Popen` call.
- :return: ``None``.
- """
- for stream_name in (
- name for name in ['stdin', 'stdout', 'stderr']
- if name in popen_kwargs):
- stream_spec = popen_kwargs[stream_name]
- if stream_spec is subprocess.PIPE:
- stream_double = getattr(
- subprocess_double,
- "{name}_double".format(name=stream_name))
- stream_file = stream_double.fake_file
- elif stream_spec is subprocess.STDOUT:
- stream_file = subprocess_double.stdout_double.fake_file
- else:
- stream_file = stream_spec
- setattr(self, stream_name, stream_file)
- class subprocess_popen_scenario(TestDoubleFunctionScenario):
- """ Scenario for `subprocess.Popen` behaviour. """
- def _hook_success(self, testcase, args, *posargs, **kwargs):
- double = self.double.popen_double
- double.set_streams(self.double, kwargs)
- return double
- def patch_subprocess_popen(testcase):
- """ Patch `subprocess.Popen` constructor for this test case.
- :param testcase: The `TestCase` instance to modify.
- :return: None.
- When the patched function is called, the registry of
- `SubprocessDouble` instances for this test case will be used
- to get the instance for the program path specified.
- """
- orig_subprocess_popen = subprocess.Popen
- def fake_subprocess_popen(args, *posargs, **kwargs):
- if kwargs.get('shell', False):
- argv = shlex.split(args)
- else:
- argv = args
- registry = SubprocessDouble.get_registry_for_testcase(testcase)
- if argv in registry:
- subprocess_double = registry[argv]
- result = subprocess_double.subprocess_popen_scenario.call_hook(
- testcase, args, *posargs, **kwargs)
- else:
- result = orig_subprocess_popen(args, *posargs, **kwargs)
- return result
- func_patcher = mock.patch.object(
- subprocess, "Popen", side_effect=fake_subprocess_popen)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- class SubprocessDoubleRegistry(collections_abc.MutableMapping):
- """ Registry of `SubprocessDouble` instances by `argv`. """
- def __init__(self, *args, **kwargs):
- items = []
- if args:
- if isinstance(args[0], collections_abc.Mapping):
- items = args[0].items()
- if isinstance(args[0], collections_abc.Iterable):
- items = args[0]
- self._mapping = dict(items)
- def __repr__(self):
- text = "<{class_name} object: {mapping}>".format(
- class_name=self.__class__.__name__, mapping=self._mapping)
- return text
- def _match_argv(self, argv):
- """ Match the specified `argv` with our registered keys. """
- match = None
- if not isinstance(argv, collections_abc.Sequence):
- return match
- candidates = iter(self._mapping)
- while match is None:
- try:
- candidate = next(candidates)
- except StopIteration:
- break
- found = None
- if candidate == argv:
- # An exact match.
- found = True
- word_iter = enumerate(candidate)
- while found is None:
- try:
- (word_index, candidate_word) = next(word_iter)
- except StopIteration:
- break
- if candidate_word is ARG_MORE:
- # Candiate matches any remaining words. We have a match.
- found = True
- elif word_index > len(argv):
- # Candidate is too long for the specified argv.
- found = False
- elif candidate_word is ARG_ANY:
- # Candidate matches any word at this position.
- continue
- elif candidate_word == argv[word_index]:
- # Candidate matches the word at this position.
- continue
- else:
- # This candidate does not match.
- found = False
- if found is None:
- # Reached the end of the candidate without a mismatch.
- found = True
- if found:
- match = candidate
- return match
- def __getitem__(self, key):
- match = self._match_argv(key)
- if match is None:
- raise KeyError(key)
- result = self._mapping[match]
- return result
- def __setitem__(self, key, value):
- if key in self:
- del self[key]
- self._mapping[key] = value
- def __delitem__(self, key):
- match = self._match_argv(key)
- if match is not None:
- del self._mapping[match]
- def __iter__(self):
- return self._mapping.__iter__()
- def __len__(self):
- return self._mapping.__len__()
- class SubprocessDouble(TestDoubleWithRegistry):
- """ A testing double for a subprocess. """
- registry_class = SubprocessDoubleRegistry
- registries = {}
- double_by_pid = weakref.WeakValueDictionary()
- function_scenario_params_by_class = {
- subprocess_popen_scenario: {
- 'default_scenario_name': 'success',
- 'set_scenario_method_name': 'set_subprocess_popen_scenario',
- },
- os_popen_scenario: {
- 'default_scenario_name': 'success',
- 'set_scenario_method_name': 'set_os_popen_scenario',
- },
- os_waitpid_scenario: {
- 'default_scenario_name': 'success',
- 'set_scenario_method_name': 'set_os_waitpid_scenario',
- },
- os_system_scenario: {
- 'default_scenario_name': 'success',
- 'set_scenario_method_name': 'set_os_system_scenario',
- },
- os_spawnv_scenario: {
- 'default_scenario_name': 'success',
- 'set_scenario_method_name': 'set_os_spawnv_scenario',
- },
- }
- def __init__(self, path=None, argv=None, *args, **kwargs):
- if path is None:
- path = tempfile.mktemp()
- self.path = path
- if argv is None:
- command_name = os.path.basename(path)
- argv = [command_name]
- self.argv = argv
- self.pid = self._make_pid()
- self._register_by_pid()
- self.set_popen_double()
- self.stdin_double = FileDouble()
- self.stdout_double = FileDouble()
- self.stderr_double = FileDouble()
- super(SubprocessDouble, self).__init__(*args, **kwargs)
- def set_popen_double(self):
- """ Set the `PopenDouble` for this instance. """
- double = PopenDouble(self.argv)
- double.pid = self.pid
- self.popen_double = double
- def __repr__(self):
- text = (
- "<SubprocessDouble instance: {id}"
- " path: {path!r},"
- " argv: {argv!r}"
- " stdin_double: {stdin_double!r}"
- " stdout_double: {stdout_double!r}"
- " stderr_double: {stderr_double!r}"
- ">").format(
- id=id(self),
- path=self.path, argv=self.argv,
- stdin_double=self.stdin_double,
- stdout_double=self.stdout_double,
- stderr_double=self.stderr_double)
- return text
- @classmethod
- def _make_pid(cls):
- """ Make a unique PID for a subprocess. """
- for pid in itertools.count(1):
- yield pid
- def _register_by_pid(self):
- """ Register this subprocess by its PID. """
- self.__class__.double_by_pid[self.pid] = self
- def get_registry_key(self):
- """ Get the registry key for this double. """
- result = tuple(self.argv)
- return result
- def set_stdin_content(self, text):
- """ Set the content of the `stdin` stream for this double. """
- self.stdin_double.fake_file = StringIO(text)
- def set_stdout_content(self, text):
- """ Set the content of the `stdout` stream for this double. """
- self.stdout_double.fake_file = StringIO(text)
- def set_stderr_content(self, text):
- """ Set the content of the `stderr` stream for this double. """
- self.stderr_double.fake_file = StringIO(text)
- def make_fake_subprocess_scenarios(path=None):
- """ Make a collection of scenarios for testing with fake files.
- :path: The filesystem path of the fake program. If not specified,
- a valid random path will be generated.
- :return: A collection of scenarios for tests involving subprocesses.
- The collection is a mapping from scenario name to a dictionary of
- scenario attributes.
- """
- if path is None:
- file_path = tempfile.mktemp()
- else:
- file_path = path
- default_scenario_params = {
- 'return_value': EXIT_STATUS_SUCCESS,
- 'program_path': file_path,
- 'argv_after_command_name': [],
- }
- scenarios = {
- 'default': {},
- 'not-found': {
- 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
- },
- }
- for (name, scenario) in scenarios.items():
- params = default_scenario_params.copy()
- params.update(scenario)
- scenario.update(params)
- program_path = params['program_path']
- program_name = os.path.basename(params['program_path'])
- argv = [program_name]
- argv.extend(params['argv_after_command_name'])
- subprocess_double_params = dict(
- path=program_path,
- argv=argv,
- )
- subprocess_double = SubprocessDouble(**subprocess_double_params)
- scenario['subprocess_double'] = subprocess_double
- scenario['fake_file_scenario_name'] = name
- return scenarios
- def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
- """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
- :param scenarios: Collection of fake subprocess scenarios.
- :return: Collection of `SubprocessDouble` instances.
- """
- doubles = set(
- scenario['subprocess_double']
- for scenario in scenarios
- if scenario['subprocess_double'] is not None)
- return doubles
- def setup_subprocess_double_behaviour(testcase, doubles=None):
- """ Set up subprocess double instances and behaviour.
- :param testcase: The `TestCase` instance to modify.
- :param doubles: Collection of `SubprocessDouble` instances.
- :return: None.
- If `doubles` is ``None``, a default collection will be made
- from the return value of `make_fake_subprocess_scenarios`.
- """
- if doubles is None:
- scenarios = make_fake_subprocess_scenarios()
- doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
- scenarios.values())
- for double in doubles:
- double.register_for_testcase(testcase)
- def setup_fake_subprocess_fixtures(testcase):
- """ Set up fixtures for fake subprocess doubles.
- :param testcase: The `TestCase` instance to modify.
- :return: None.
- """
- scenarios = make_fake_subprocess_scenarios()
- testcase.fake_subprocess_scenarios = scenarios
- doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
- scenarios.values())
- setup_subprocess_double_behaviour(testcase, doubles)
- def patch_os_popen(testcase):
- """ Patch `os.popen` behaviour for this test case.
- :param testcase: The `TestCase` instance to modify.
- :return: None.
- When the patched function is called, the registry of
- `SubprocessDouble` instances for this test case will be used
- to get the instance for the program path specified.
- """
- orig_os_popen = os.popen
- def fake_os_popen(cmd, mode='r', buffering=-1):
- registry = SubprocessDouble.get_registry_for_testcase(testcase)
- command_argv = shlex.split(cmd)
- if command_argv in registry:
- subprocess_double = registry[command_argv]
- result = subprocess_double.os_popen_scenario.call_hook(
- cmd, mode, buffering)
- else:
- result = orig_os_popen(cmd, mode, buffering)
- return result
- func_patcher = mock.patch.object(
- os, "popen", side_effect=fake_os_popen)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_os_waitpid(testcase):
- """ Patch `os.waitpid` behaviour for this test case.
- :param testcase: The `TestCase` instance to modify.
- :return: None.
- When the patched function is called, the registry of
- `SubprocessDouble` instances for this test case will be used
- to get the instance for the program path specified.
- """
- orig_os_waitpid = os.waitpid
- def fake_os_waitpid(pid, options):
- registry = SubprocessDouble.double_by_pid
- if pid in registry:
- subprocess_double = registry[pid]
- result = subprocess_double.os_waitpid_scenario.call_hook(
- pid, options)
- else:
- result = orig_os_waitpid(pid, options)
- return result
- func_patcher = mock.patch.object(
- os, "waitpid", side_effect=fake_os_waitpid)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_os_system(testcase):
- """ Patch `os.system` behaviour for this test case.
- :param testcase: The `TestCase` instance to modify.
- :return: None.
- When the patched function is called, the registry of
- `SubprocessDouble` instances for this test case will be used
- to get the instance for the program path specified.
- """
- orig_os_system = os.system
- def fake_os_system(command):
- registry = SubprocessDouble.get_registry_for_testcase(testcase)
- command_argv = shlex.split(command)
- if command_argv in registry:
- subprocess_double = registry[command_argv]
- result = subprocess_double.os_system_scenario.call_hook(
- command)
- else:
- result = orig_os_system(command)
- return result
- func_patcher = mock.patch.object(
- os, "system", side_effect=fake_os_system)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- def patch_os_spawnv(testcase):
- """ Patch `os.spawnv` behaviour for this test case.
- :param testcase: The `TestCase` instance to modify.
- :return: None.
- When the patched function is called, the registry of
- `SubprocessDouble` instances for this test case will be used
- to get the instance for the program path specified.
- """
- orig_os_spawnv = os.spawnv
- def fake_os_spawnv(mode, file, args):
- registry = SubprocessDouble.get_registry_for_testcase(testcase)
- registry_key = tuple(args)
- if registry_key in registry:
- subprocess_double = registry[registry_key]
- result = subprocess_double.os_spawnv_scenario.call_hook(
- mode, file, args)
- else:
- result = orig_os_spawnv(mode, file, args)
- return result
- func_patcher = mock.patch.object(
- os, "spawnv", side_effect=fake_os_spawnv)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- # Local variables:
- # coding: utf-8
- # mode: python
- # End:
- # vim: fileencoding=utf-8 filetype=python :
|