123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766 |
- # -*- coding: utf-8; -*-
- #
- # test/test_dputhelper.py
- # Part of ‘dput’, a Debian package upload toolkit.
- #
- # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
- #
- # This is free software: you may copy, modify, and/or distribute this work
- # under the terms of the GNU General Public License as published by the
- # Free Software Foundation; version 3 of that license or any later version.
- # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
- """ Unit tests for ‘dput.helper.dputhelper’ module. """
- from __future__ import (absolute_import, unicode_literals)
- import sys
- import os
- import collections
- import itertools
- import tempfile
- import doctest
- import textwrap
- import testtools
- import testtools.matchers
- import testscenarios
- import pkg_resources
- __package__ = str("test")
- __import__(__package__)
- sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
- from dput.helper import dputhelper
- from .helper import (
- mock,
- StringIO,
- patch_sys_argv,
- patch_system_interfaces,
- patch_time_time,
- patch_os_spawnv,
- SubprocessDouble,
- )
- class spawnv_TestCase(
- testscenarios.WithScenarios,
- testtools.TestCase):
- """ Test cases for `spawnv` function. """
- default_args = collections.OrderedDict([
- ('mode', object()),
- ('file', tempfile.mktemp()),
- ('args', ["arg-{}".format(n) for n in range(5)]),
- ])
- scenarios = [
- ('success', {
- 'test_args': default_args.copy(),
- 'os_spawnv_scenario_name': 'success',
- }),
- ('failure', {
- 'test_args': default_args.copy(),
- 'os_spawnv_scenario_name': 'failure',
- 'expected_output': textwrap.dedent("""\
- Warning: The execution of '...' as
- '...'
- returned a nonzero exit code.
- """)
- }),
- ('not-found', {
- 'test_args': default_args.copy(),
- 'os_spawnv_scenario_name': 'not_found',
- 'expected_output': textwrap.dedent("""\
- Error: Failed to execute '...'.
- The file may not exist or not be executable.
- """)
- }),
- ]
- def setUp(self):
- """ Set up test fixtures. """
- super(spawnv_TestCase, self).setUp()
- patch_system_interfaces(self)
- patch_os_spawnv(self)
- self.set_subprocess_double()
- def set_subprocess_double(self):
- """ Set the test double for the subprocess. """
- double = SubprocessDouble(
- self.test_args['file'],
- self.test_args['args'])
- double.register_for_testcase(self)
- double.set_os_spawnv_scenario(self.os_spawnv_scenario_name)
- self.subprocess_double = double
- def test_calls_os_spawnv_with_specified_args(self):
- """ Should call `os.spawnv` with specified arguments. """
- dputhelper.spawnv(*self.test_args.values())
- os.spawnv.assert_called_with(*self.test_args.values())
- def test_emits_expected_output(self):
- """ Should emit the expected output messages. """
- if not hasattr(self, 'expected_output'):
- self.expected_output = ""
- dputhelper.spawnv(*self.test_args.values())
- self.assertThat(
- sys.stdout.getvalue(),
- testtools.matchers.DocTestMatches(
- self.expected_output, flags=doctest.ELLIPSIS))
- class TimestampFile_TestCase(testtools.TestCase):
- """ Base for test cases for the `TimestampFile` class. """
- scenarios = NotImplemented
- def setUp(self):
- """ Set up test fixtures. """
- super(TimestampFile_TestCase, self).setUp()
- patch_time_time(self, itertools.count(1))
- self.test_file = StringIO()
- self.instance = dputhelper.TimestampFile(self.test_file)
- class TimestampFile_InstanceTestCase(
- testscenarios.WithScenarios,
- TimestampFile_TestCase):
- """ Test cases for `TimestampFile` instance creation. """
- scenarios = [
- ('default', {}),
- ]
- def test_has_specified_file(self):
- """ Should have specified file object as `f` attribute. """
- self.assertIs(self.test_file, self.instance.f)
- def test_has_attributes_from_component_file(self):
- """ Should have attributes directly from component file. """
- attr_names = [
- 'b0gUs',
- 'mode', 'name', 'encoding',
- 'readable', 'seekable', 'writable',
- 'read', 'seek', 'tell',
- ]
- for attr_name in attr_names:
- expected_attr_value = getattr(self.test_file, attr_name, None)
- self.expectThat(
- getattr(self.instance, attr_name, None),
- testtools.matchers.Equals(expected_attr_value))
- class TimestampFile_write_TestCase(
- testscenarios.WithScenarios,
- TimestampFile_TestCase):
- """ Test cases for `TimestampFile.write` method. """
- scenarios = [
- ('empty', {
- 'test_output': "",
- 'expected_lines': [],
- }),
- ('lines-one', {
- 'test_output': textwrap.dedent("""\
- Lorem ipsum, dolor sit amet.
- """),
- 'expected_lines': [
- "1: Lorem ipsum, dolor sit amet.",
- "",
- ],
- }),
- ('lines-three', {
- 'test_output': textwrap.dedent("""\
- Lorem ipsum, dolor sit amet,
- consectetur adipiscing elit.
- Integer non pulvinar risus, sed malesuada diam.
- """),
- 'expected_lines': [
- "1: Lorem ipsum, dolor sit amet,",
- "2: consectetur adipiscing elit.",
- "3: Integer non pulvinar risus, sed malesuada diam.",
- "",
- ],
- }),
- ('lines-two-with-trail', {
- 'test_output': textwrap.dedent("""\
- Lorem ipsum, dolor sit amet,
- consectetur adipiscing elit.
- Integer non pulvinar risus"""),
- 'expected_lines': [
- "1: Lorem ipsum, dolor sit amet,",
- "2: consectetur adipiscing elit.",
- "3: Integer non pulvinar risus",
- ],
- }),
- ]
- def test_has_expected_content_for_output(self):
- """ Should have expected content for specified `write` output. """
- self.instance.write(self.test_output)
- expected_lines = self.expected_lines
- if self.expected_lines:
- if self.expected_lines[-1]:
- # Expecting an unterminated final line.
- expected_lines = self.expected_lines[:-1]
- expected_lines.append("")
- else:
- # Expecting no output following newline.
- expected_lines = self.expected_lines
- expected_content = "\n".join(expected_lines)
- self.assertEqual(expected_content, self.instance.f.getvalue())
- class TimestampFile_close_TestCase(
- testscenarios.WithScenarios,
- TimestampFile_TestCase):
- """ Test cases for `TimestampFile.write` method. """
- scenarios = TimestampFile_write_TestCase.scenarios
- @testtools.skip("TimestampFile.close method is broken")
- def test_has_expected_final_line(self):
- """ Should have expected final line. """
- self.instance.write(self.test_output)
- self.instance.f.seek(0)
- self.instance.close()
- expected_content = self.expected_lines[-1]
- self.assertEqual(expected_content, self.instance.f.getvalue())
- class FileWithProgress_TestCase(
- testscenarios.WithScenarios,
- testtools.TestCase):
- """ Base for test cases for the `FileWithProgress` class. """
- default_args = {
- 'ptype': 0,
- 'progressf': sys.__stdout__,
- 'size': -1,
- 'step': 1024,
- }
- def setUp(self):
- """ Set up test fixtures. """
- super(FileWithProgress_TestCase, self).setUp()
- patch_system_interfaces(self)
- self.test_file = StringIO(
- getattr(self, 'content', ""))
- self.set_test_args()
- self.make_instance()
- def set_test_args(self):
- """ Set the arguments for the test instance constructor. """
- self.test_args = dict(
- f=self.test_file,
- )
- if hasattr(self, 'test_ptype'):
- self.test_args['ptype'] = self.test_ptype
- if hasattr(self, 'test_progressf'):
- self.test_args['progressf'] = self.test_progressf
- if hasattr(self, 'test_size'):
- self.test_args['size'] = self.test_size
- if hasattr(self, 'test_step'):
- self.test_args['step'] = self.test_step
- def make_instance(self):
- """ Make the test instance of the class. """
- self.instance = dputhelper.FileWithProgress(**self.test_args)
- class FileWithProgress_ArgsTestCase(FileWithProgress_TestCase):
- """ Test cases for constructor arguments for `FileWithProgress` class. """
- scenarios = [
- ('simple', {}),
- ('all args', {
- 'test_ptype': 1,
- 'test_progressf': StringIO(),
- 'test_size': 10,
- 'test_step': 2,
- }),
- ]
- def test_has_specified_file(self):
- """ Should have specified file object as `f` attribute. """
- self.assertIs(self.test_file, self.instance.f)
- def test_has_specified_ptype(self):
- """ Should have specified progress type value as `ptype` attribute. """
- expected_ptype = getattr(
- self, 'test_ptype', self.default_args['ptype'])
- self.assertEqual(expected_ptype, self.instance.ptype)
- def test_has_specified_progressf(self):
- """ Should have specified progress file as `progressf` attribute. """
- expected_progressf = getattr(
- self, 'test_progressf', self.default_args['progressf'])
- self.assertEqual(expected_progressf, self.instance.progressf)
- def test_has_specified_size(self):
- """ Should have specified size value as `size` attribute. """
- expected_size = getattr(
- self, 'test_size', self.default_args['size'])
- self.assertEqual(expected_size, self.instance.size)
- def test_has_specified_step(self):
- """ Should have specified step value as `step` attribute. """
- expected_step = getattr(
- self, 'test_step', self.default_args['step'])
- self.assertEqual(expected_step, self.instance.step)
- def test_has_attributes_from_component_file(self):
- """ Should have attributes directly from component file. """
- attr_names = [
- 'b0gUs',
- 'mode', 'name', 'encoding',
- 'readable', 'seekable', 'writable',
- 'seek', 'tell', 'write',
- ]
- for attr_name in attr_names:
- expected_attr_value = getattr(self.test_file, attr_name, None)
- self.expectThat(
- getattr(self.instance, attr_name, None),
- testtools.matchers.Equals(expected_attr_value))
- class FileWithProgress_OutputTestCase(FileWithProgress_TestCase):
- """ Test cases for progress output for `FileWithProgress` class. """
- content_scenarios = [
- ('empty', {
- 'content': "",
- }),
- ('10 000 chars', {
- 'content': "0123456789\n" * 1000,
- }),
- ('10 000 000 chars', {
- 'content': "0123456789\n" * 1000000,
- }),
- ]
- ptype_scenarios = [
- ('default', {}),
- ('ptype 0', {'test_ptype': 0}),
- ('ptype 1', {'test_ptype': 1}),
- ('ptype 2', {'test_ptype': 2}),
- ]
- step_scenarios = [
- ('default', {}),
- ('step 5', {'test_step': 5}),
- ('step 500', {'test_step': 500}),
- ('step 50 000', {'test_step': 50000}),
- ]
- scenarios = testscenarios.multiply_scenarios(
- content_scenarios, ptype_scenarios, step_scenarios)
- def setUp(self):
- """ Set up test fixtures. """
- super(FileWithProgress_OutputTestCase, self).setUp()
- self.test_file = StringIO(self.content)
- self.test_size = len(self.content)
- self.test_progressf = StringIO()
- self.set_test_args()
- self.make_instance()
- self.set_expected_output()
- def set_expected_output(self):
- """ Set the expected output for this test case. """
- ptype = getattr(self, 'test_ptype', self.default_args['ptype'])
- if ptype == 1:
- self.expected_output = "/"
- elif ptype == 2:
- step = getattr(self, 'test_step', 1024)
- total_bytes = len(self.content)
- total_hunks = int(total_bytes / step)
- total_hunks_text = "{size}k".format(size=total_hunks)
- total_steps = int(
- (total_bytes + step - 1) / step)
- total_steps_text = "{size}k".format(size=total_steps)
- progress_text = "{hunks}/{steps}".format(
- hunks=total_hunks_text, steps=total_steps_text)
- self.expected_output = progress_text
- else:
- # `ptype == 0` specifies no progress output.
- self.expected_output = ""
- if not self.content:
- # No progress output for an empty file.
- self.expected_output = ""
- def test_emits_expected_output_for_content(self):
- """ Should emit expected output for file content. """
- self.instance.read()
- output_stream_content = self.test_progressf.getvalue()
- self.assertEqual(
- self.expected_output, output_stream_content)
- def test_clears_output_on_close(self):
- """ Should clear progress output when closed. """
- self.instance.read()
- self.instance.close()
- expected_output = (
- self.expected_output
- + len(self.expected_output) * "\b"
- + len(self.expected_output) * " "
- + len(self.expected_output) * "\b"
- )
- output_stream_content = self.test_progressf.getvalue()
- self.assertEqual(expected_output, output_stream_content)
- def patch_filewithprogress(testcase):
- """ Patch the `FileWithProgress` class for the test case. """
- if not hasattr(testcase, 'fake_filewithprogress'):
- testcase.fake_filewithprogress = mock.MagicMock(
- spec=dputhelper.FileWithProgress, name="FileWithProgress")
- def fake_filewithprogress_factory(
- f, ptype=0, progressf=sys.stdout, size=-1, step=1024):
- result = testcase.fake_filewithprogress
- result.f = f
- result.ptype = ptype
- result.progressf = progressf
- result.size = size
- result.step = step
- return result
- func_patcher = mock.patch.object(
- dputhelper, "FileWithProgress",
- side_effect=fake_filewithprogress_factory)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- GetoptResult = collections.namedtuple('GetoptResult', ['optlist', 'args'])
- class getopt_SuccessTestCase(
- testscenarios.WithScenarios,
- testtools.TestCase):
- """ Success test cases for `getopt` function. """
- scenarios = [
- ('empty', {
- 'test_argv': [object()],
- 'expected_result': GetoptResult(
- optlist=[], args=[]),
- }),
- ('no opts', {
- 'test_argv': [object(), "foo", "bar", "baz"],
- 'expected_result': GetoptResult(
- optlist=[], args=["foo", "bar", "baz"]),
- }),
- ('only short opts', {
- 'test_argv': [object(), "-a", "-b", "-c"],
- 'test_shortopts': "axbycz",
- 'expected_result': GetoptResult(
- optlist=[
- ('-a', ""),
- ('-b', ""),
- ('-c', ""),
- ],
- args=[]),
- }),
- ('only long opts', {
- 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
- 'test_longopts': [
- "wibble", "alpha", "wobble",
- "beta", "wubble", "gamma",
- ],
- 'expected_result': GetoptResult(
- optlist=[
- ('--alpha', ""),
- ('--beta', ""),
- ('--gamma', ""),
- ],
- args=[]),
- }),
- ('long opt prefix', {
- 'test_argv': [object(), "--al", "--be", "--ga"],
- 'test_longopts': [
- "wibble", "alpha", "wobble",
- "beta", "wubble", "gamma",
- ],
- 'expected_result': GetoptResult(
- optlist=[
- ('--alpha', ""),
- ('--beta', ""),
- ('--gamma', ""),
- ],
- args=[]),
- }),
- ('short opt cluster', {
- 'test_argv': [object(), "-abc"],
- 'test_shortopts': "abc",
- 'expected_result': GetoptResult(
- optlist=[
- ('-a', ""),
- ('-b', ""),
- ('-c', ""),
- ],
- args=[]),
- }),
- ('short with args', {
- 'test_argv': [object(), "-a", "-b", "eggs", "-cbeans"],
- 'test_shortopts': "ab:c:",
- 'expected_result': GetoptResult(
- optlist=[
- ('-a', ""),
- ('-b', "eggs"),
- ('-c', "beans"),
- ],
- args=[]),
- }),
- ('long with args', {
- 'test_argv': [
- object(),
- "--alpha",
- "--beta=eggs",
- "--gamma", "beans"],
- 'test_longopts': [
- "wibble", "alpha", "wobble",
- "beta=", "wubble", "gamma=",
- ],
- 'expected_result': GetoptResult(
- optlist=[
- ('--alpha', ""),
- ('--beta', "eggs"),
- ('--gamma', "beans"),
- ],
- args=[]),
- }),
- ('long with optional args', {
- 'test_argv': [
- object(),
- "--alpha",
- "--beta=eggs",
- "--gamma"],
- 'test_longopts': [
- "wibble", "alpha", "wobble",
- "beta==", "wubble", "gamma==",
- ],
- 'expected_result': GetoptResult(
- optlist=[
- ('--alpha', ""),
- ('--beta', "eggs"),
- ('--gamma', ""),
- ],
- args=[]),
- }),
- ('single hyphen arg', {
- 'test_argv': [object(), "-a", "-b", "-c", "-"],
- 'test_shortopts': "axbycz",
- 'expected_result': GetoptResult(
- optlist=[
- ('-a', ""),
- ('-b', ""),
- ('-c', ""),
- ],
- args=["-"]),
- }),
- ('explicit end of opts', {
- 'test_argv': [
- object(),
- "--alpha",
- "--beta",
- "--",
- "--spam"],
- 'test_longopts': [
- "wibble", "alpha", "wobble",
- "beta", "wubble", "gamma",
- ],
- 'expected_result': GetoptResult(
- optlist=[
- ('--alpha', ""),
- ('--beta', ""),
- ],
- args=["--spam"]),
- }),
- ]
- def test_returns_expected_result_for_argv(self):
- """ Should return expected result for specified argv. """
- shortopts = getattr(self, 'test_shortopts', "")
- longopts = getattr(self, 'test_longopts', "")
- result = dputhelper.getopt(
- self.test_argv[1:], shortopts, longopts)
- self.assertEqual(self.expected_result, result)
- class getopt_ErrorTestCase(
- testscenarios.WithScenarios,
- testtools.TestCase):
- """ Error test cases for `getopt` function. """
- scenarios = [
- ('short opt unknown', {
- 'test_argv': [object(), "-a", "-b", "-z", "-c"],
- 'test_shortopts': "abc",
- 'expected_error': dputhelper.DputException,
- }),
- ('short missing arg', {
- 'test_argv': [object(), "-a", "-b", "-c"],
- 'test_shortopts': "abc:",
- 'expected_error': dputhelper.DputException,
- }),
- ('long opt unknown', {
- 'test_argv': [
- object(), "--alpha", "--beta", "--zeta", "--gamma"],
- 'test_longopts': [
- "alpha", "beta", "gamma"],
- 'expected_error': dputhelper.DputException,
- }),
- ('long ambiguous prefix', {
- 'test_argv': [
- object(), "--alpha", "--be", "--gamma"],
- 'test_longopts': [
- "alpha", "beta", "bettong", "bertha", "gamma"],
- 'expected_error': dputhelper.DputException,
- }),
- ('long missing arg', {
- 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
- 'test_longopts': [
- "alpha", "beta", "gamma="],
- 'expected_error': dputhelper.DputException,
- }),
- ('long unexpected arg', {
- 'test_argv': [
- object(), "--alpha", "--beta=beans", "--gamma"],
- 'test_longopts': [
- "alpha", "beta", "gamma"],
- 'expected_error': dputhelper.DputException,
- }),
- ]
- def test_raises_expected_error_for_argv(self):
- """ Should raise expected error for specified argv. """
- shortopts = getattr(self, 'test_shortopts', "")
- longopts = getattr(self, 'test_longopts', "")
- with testtools.ExpectedException(self.expected_error):
- dputhelper.getopt(
- self.test_argv[1:], shortopts, longopts)
- def patch_getopt(testcase):
- """ Patch the `getopt` function for the specified test case. """
- def fake_getopt(args, shortopts, longopts):
- result = (testcase.getopt_opts, testcase.getopt_args)
- return result
- func_patcher = mock.patch.object(
- dputhelper, "getopt", side_effect=fake_getopt)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- class get_progname_TestCase(
- testscenarios.WithScenarios,
- testtools.TestCase):
- """ Test cases for `get_progname` function. """
- command_name_scenarios = [
- ('command-simple', {
- 'argv_zero': "amet",
- 'expected_progname': "amet",
- }),
- ('command-relative', {
- 'argv_zero': "lorem/ipsum/dolor/sit/amet",
- 'expected_progname': "amet",
- }),
- ('command-absolute', {
- 'argv_zero': "/lorem/ipsum/dolor/sit/amet",
- 'expected_progname': "amet",
- }),
- ]
- subsequent_args_scenarios = [
- ('args-empty', {
- 'argv_remain': [],
- }),
- ('args-one-word', {
- 'argv_remain': ["spam"],
- }),
- ('args-three-words', {
- 'argv_remain': ["spam", "beans", "eggs"],
- }),
- ('args-one-option', {
- 'argv_remain': ["--spam"],
- }),
- ]
- scenarios = testscenarios.multiply_scenarios(
- command_name_scenarios, subsequent_args_scenarios)
- def setUp(self):
- """ Set up test fixtures. """
- super(get_progname_TestCase, self).setUp()
- self.test_argv = [self.argv_zero] + self.argv_remain
- def test_returns_expected_progname(self):
- """ Should return expected progname value for command line. """
- result = dputhelper.get_progname(self.test_argv)
- self.assertEqual(self.expected_progname, result)
- def test_queries_sys_argv_if_argv_unspecified(self):
- """ Should query `sys.argv` if no `argv` specified. """
- self.sys_argv = self.test_argv
- patch_sys_argv(self)
- result = dputhelper.get_progname()
- self.assertEqual(self.expected_progname, result)
- def patch_pkg_resources_get_distribution(testcase):
- """ Patch `pkg_resources.get_distribution` for the test case. """
- if not hasattr(testcase, 'fake_distribution'):
- testcase.fake_distribution = mock.MagicMock(pkg_resources.Distribution)
- func_patcher = mock.patch.object(
- pkg_resources, 'get_distribution',
- return_value=testcase.fake_distribution)
- func_patcher.start()
- testcase.addCleanup(func_patcher.stop)
- class get_distribution_version_TestCase(
- testscenarios.WithScenarios,
- testtools.TestCase):
- """ Test cases for `get_distribution_version` function. """
- scenarios = [
- ('simple', {
- 'fake_distribution': mock.MagicMock(
- project_name="lorem", version="42.23"),
- }),
- ]
- def setUp(self):
- """ Set up test fixtures. """
- super(get_distribution_version_TestCase, self).setUp()
- patch_pkg_resources_get_distribution(self)
- def test_returns_expected_result(self):
- """ Should return expected version for the distribution. """
- result = dputhelper.get_distribution_version()
- expected_version = self.fake_distribution.version
- self.assertEqual(expected_version, result)
- # Local variables:
- # coding: utf-8
- # mode: python
- # End:
- # vim: fileencoding=utf-8 filetype=python :
|