test_dcut.py 70 KB


  1. # -*- coding: utf-8; -*-
  2. #
  3. # test/test_dcut.py
  4. # Part of ‘dput’, a Debian package upload toolkit.
  5. #
  6. # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
  7. #
  8. # This is free software: you may copy, modify, and/or distribute this work
  9. # under the terms of the GNU General Public License as published by the
  10. # Free Software Foundation; version 3 of that license or any later version.
  11. # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
  12. """ Unit tests for ‘dcut’ module. """
  13. from __future__ import (absolute_import, unicode_literals)
  14. import sys
  15. import os
  16. import shutil
  17. import subprocess
  18. import tempfile
  19. import itertools
  20. import textwrap
  21. import doctest
  22. import testtools
  23. import testscenarios
  24. import pkg_resources
  25. __package__ = str("test")
  26. __import__(__package__)
  27. sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
  28. import dput.dcut
  29. from dput.helper import dputhelper
  30. from .helper import (
  31. StringIO,
  32. mock,
  33. FakeSystemExit,
  34. EXIT_STATUS_SUCCESS, EXIT_STATUS_FAILURE,
  35. patch_sys_argv,
  36. patch_system_interfaces,
  37. patch_time_time,
  38. patch_os_environ,
  39. patch_os_getpid,
  40. patch_os_getuid,
  41. PasswdEntry,
  42. patch_pwd_getpwuid,
  43. patch_os_unlink,
  44. patch_os_rmdir,
  45. patch_shutil_rmtree,
  46. patch_tempfile_mkdtemp,
  47. FileDouble,
  48. setup_file_double_behaviour,
  49. ARG_ANY, ARG_MORE,
  50. SubprocessDouble,
  51. patch_subprocess_popen,
  52. patch_os_system,
  53. patch_os_popen,
  54. patch_os_waitpid,
  55. setup_subprocess_double_behaviour,
  56. )
  57. from .test_configfile import (
  58. set_config,
  59. )
  60. from .test_dputhelper import (
  61. patch_pkg_resources_get_distribution,
  62. patch_getopt,
  63. )
  64. from .test_changesfile import (
  65. make_changes_file_scenarios,
  66. set_changes_file_scenario,
  67. )
  68. from . import test_dput_main
  69. dummy_pwent = PasswdEntry(
  70. pw_name="lorem",
  71. pw_passwd="!",
  72. pw_uid=1,
  73. pw_gid=1,
  74. pw_gecos="Lorem Ipsum,spam,eggs,beans",
  75. pw_dir=tempfile.mktemp(),
  76. pw_shell=tempfile.mktemp())
  77. def patch_getoptions(testcase):
  78. """ Patch the `getoptions` function for this test case. """
  79. default_options = {
  80. 'debug': False,
  81. 'simulate': False,
  82. 'config': None,
  83. 'host': "foo",
  84. 'passive': False,
  85. 'changes': None,
  86. 'filetoupload': None,
  87. 'filetocreate': None,
  88. }
  89. if not hasattr(testcase, 'getoptions_opts'):
  90. testcase.getoptions_opts = {}
  91. if not hasattr(testcase, 'getoptions_args'):
  92. testcase.getoptions_args = []
  93. def fake_getoptions():
  94. options = dict(default_options)
  95. options.update(testcase.getoptions_opts)
  96. arguments = list(testcase.getoptions_args)
  97. result = (options, arguments)
  98. return result
  99. func_patcher = mock.patch.object(
  100. dput.dcut, "getoptions", side_effect=fake_getoptions)
  101. func_patcher.start()
  102. testcase.addCleanup(func_patcher.stop)
  103. def get_upload_method_func(testcase):
  104. """ Get the specified upload method. """
  105. host = testcase.test_host
  106. method_name = testcase.runtime_config_parser.get(host, 'method')
  107. method_func = dput.dput.upload_methods[method_name]
  108. return method_func
  109. class make_usage_message_TestCase(testtools.TestCase):
  110. """ Test cases for `make_usage_message` function. """
  111. def setUp(self):
  112. """ Set up test fixtures. """
  113. super(make_usage_message_TestCase, self).setUp()
  114. patch_sys_argv(self)
  115. def test_returns_text_with_program_name(self):
  116. """ Should return text with expected program name. """
  117. result = dput.dcut.make_usage_message()
  118. expected_result = textwrap.dedent("""\
  119. Usage: {progname} ...
  120. ...
  121. """).format(progname=self.progname)
  122. self.expectThat(
  123. result,
  124. testtools.matchers.DocTestMatches(
  125. expected_result, flags=doctest.ELLIPSIS))
  126. class getoptions_TestCase(testtools.TestCase):
  127. """ Base for test cases for `getoptions` function. """
  128. default_options = NotImplemented
  129. scenarios = NotImplemented
  130. def setUp(self):
  131. """ Set up test fixtures. """
  132. super(getoptions_TestCase, self).setUp()
  133. patch_system_interfaces(self)
  134. patch_os_environ(self)
  135. patch_os_getuid(self)
  136. patch_pwd_getpwuid(self)
  137. patch_sys_argv(self)
  138. self.patch_etc_mailname()
  139. setup_file_double_behaviour(
  140. self, [self.mailname_file_double])
  141. self.set_hostname_subprocess_double()
  142. patch_os_popen(self)
  143. self.patch_getopt()
  144. if hasattr(self, 'expected_options'):
  145. self.set_expected_result()
  146. self.patch_distribution()
  147. self.patch_make_usage_message()
  148. def patch_etc_mailname(self):
  149. """ Patch the ‘/etc/mailname’ file. """
  150. path = "/etc/mailname"
  151. if hasattr(self, 'mailname_fake_file'):
  152. double = FileDouble(path, self.mailname_fake_file)
  153. else:
  154. double = FileDouble(path, StringIO())
  155. if hasattr(self, 'mailname_file_open_scenario_name'):
  156. double.set_open_scenario(self.mailname_file_open_scenario_name)
  157. self.mailname_file_double = double
  158. def set_hostname_subprocess_double(self):
  159. """ Set the test double for the ‘hostname’ subprocess. """
  160. path = "/bin/hostname"
  161. argv = (path, "--fqdn")
  162. double = SubprocessDouble(path, argv=argv)
  163. double.register_for_testcase(self)
  164. double.set_os_popen_scenario('success')
  165. double.set_stdout_content(getattr(self, 'hostname_stdout_content', ""))
  166. self.hostname_subprocess_double = double
  167. def patch_getopt(self):
  168. """ Patch the `dputhelper.getopt` function. """
  169. if not hasattr(self, 'getopt_opts'):
  170. self.getopt_opts = []
  171. else:
  172. self.getopt_opts = list(self.getopt_opts)
  173. if not hasattr(self, 'getopt_args'):
  174. self.getopt_args = []
  175. else:
  176. self.getopt_args = list(self.getopt_args)
  177. patch_getopt(self)
  178. def set_expected_result(self):
  179. """ Set the expected result value. """
  180. if not hasattr(self, 'expected_arguments'):
  181. self.expected_arguments = []
  182. expected_options = self.default_options.copy()
  183. expected_options.update(self.expected_options)
  184. self.expected_result = (expected_options, self.expected_arguments)
  185. def patch_distribution(self):
  186. """ Patch the Python distribution for this test case. """
  187. self.fake_distribution = mock.MagicMock(pkg_resources.Distribution)
  188. if hasattr(self, 'dcut_version'):
  189. self.fake_distribution.version = self.dcut_version
  190. patch_pkg_resources_get_distribution(self)
  191. def patch_make_usage_message(self):
  192. """ Patch the `make_usage_message` function. """
  193. if hasattr(self, 'dcut_usage_message'):
  194. text = self.dcut_usage_message
  195. else:
  196. text = self.getUniqueString()
  197. func_patcher = mock.patch.object(
  198. dput.dcut, 'make_usage_message', return_value=text)
  199. func_patcher.start()
  200. self.addCleanup(func_patcher.stop)
  201. class getoptions_UploaderTestCase(
  202. testscenarios.WithScenarios,
  203. getoptions_TestCase):
  204. """ Test cases for `getoptions` function, determining uploader. """
  205. environ_scenarios = [
  206. ('environ-none', {
  207. 'os_environ': {},
  208. }),
  209. ('environ-email-not-delimited', {
  210. 'os_environ': {'EMAIL': "quux@example.org"},
  211. 'expected_environ_uploader': "<quux@example.org>",
  212. }),
  213. ('environ-email-delimited', {
  214. 'os_environ': {'EMAIL': "<quux@example.org>"},
  215. 'expected_environ_uploader': "<quux@example.org>",
  216. }),
  217. ('environ-debemail-not-delimited', {
  218. 'os_environ': {'DEBEMAIL': "flup@example.org"},
  219. 'expected_environ_uploader': "<flup@example.org>",
  220. }),
  221. ('environ-debemail-delimited', {
  222. 'os_environ': {'DEBEMAIL': "<flup@example.org>"},
  223. 'expected_environ_uploader': "<flup@example.org>",
  224. }),
  225. ('environ-both-email-and-debfullname', {
  226. 'os_environ': {
  227. 'EMAIL': "quux@example.org",
  228. 'DEBFULLNAME': "Lorem Ipsum",
  229. },
  230. 'expected_environ_uploader': "Lorem Ipsum <quux@example.org>",
  231. }),
  232. ('environ-both-debemail-and-debfullname', {
  233. 'os_environ': {
  234. 'DEBEMAIL': "flup@example.org",
  235. 'DEBFULLNAME': "Lorem Ipsum",
  236. },
  237. 'expected_environ_uploader': "Lorem Ipsum <flup@example.org>",
  238. }),
  239. ('environ-both-email-and-debemail', {
  240. 'os_environ': {
  241. 'EMAIL': "quux@example.org",
  242. 'DEBEMAIL': "flup@example.org",
  243. },
  244. 'expected_environ_uploader': "<flup@example.org>",
  245. }),
  246. ('environ-both-email-and-debemail-and-debfullname', {
  247. 'os_environ': {
  248. 'EMAIL': "quux@example.org",
  249. 'DEBEMAIL': "flup@example.org",
  250. 'DEBFULLNAME': "Lorem Ipsum",
  251. },
  252. 'expected_environ_uploader': "Lorem Ipsum <flup@example.org>",
  253. }),
  254. ]
  255. system_scenarios = [
  256. ('domain-from-mailname-file', {
  257. 'mailname_fake_file': StringIO("consecteur.example.org"),
  258. 'pwd_getpwuid_return_value': dummy_pwent._replace(
  259. pw_name="grue",
  260. pw_gecos="Dolor Sit Amet,spam,beans,eggs"),
  261. 'expected_debug_chatter': textwrap.dedent("""\
  262. D: Guessing uploader
  263. """),
  264. 'expected_system_uploader':
  265. "Dolor Sit Amet <grue@consecteur.example.org>",
  266. }),
  267. ('domain-from-hostname-command', {
  268. 'mailname_file_open_scenario_name': "read_denied",
  269. 'hostname_stdout_content': "consecteur.example.org\n",
  270. 'pwd_getpwuid_return_value': dummy_pwent._replace(
  271. pw_name="grue",
  272. pw_gecos="Dolor Sit Amet,spam,beans,eggs"),
  273. 'expected_debug_chatter': textwrap.dedent("""\
  274. D: Guessing uploader
  275. D: Guessing uploader: /etc/mailname was a failure
  276. """),
  277. 'expected_system_uploader':
  278. "Dolor Sit Amet <grue@consecteur.example.org>",
  279. }),
  280. ('domain-failure', {
  281. 'mailname_file_open_scenario_name': "read_denied",
  282. 'hostname_stdout_content': "",
  283. 'pwd_getpwuid_return_value': dummy_pwent._replace(
  284. pw_name="grue",
  285. pw_gecos="Dolor Sit Amet,spam,beans,eggs"),
  286. 'expected_debug_chatter': textwrap.dedent("""\
  287. D: Guessing uploader
  288. D: Guessing uploader: /etc/mailname was a failure
  289. D: Couldn't guess uploader
  290. """),
  291. }),
  292. ]
  293. scenarios = testscenarios.multiply_scenarios(
  294. environ_scenarios, system_scenarios)
  295. def setUp(self, *args, **kwargs):
  296. """ Set up test fixtures. """
  297. super(getoptions_UploaderTestCase, self).setUp(*args, **kwargs)
  298. self.set_expected_uploader()
  299. def set_expected_uploader(self):
  300. """ Set the expected uploader value for this test case. """
  301. for attrib_name in [
  302. 'expected_command_line_uploader',
  303. 'expected_environ_uploader',
  304. 'expected_system_uploader']:
  305. if hasattr(self, attrib_name):
  306. self.expected_uploader = getattr(self, attrib_name)
  307. break
  308. def test_emits_debug_message_for_uploader_discovery(self):
  309. """ Should emit debug message for uploader discovery. """
  310. sys.argv.insert(1, "--debug")
  311. dput.dcut.getoptions()
  312. expected_output_lines = [
  313. "D: trying to get maintainer email from environment"]
  314. if hasattr(self, 'expected_environ_uploader'):
  315. guess_line_template = "D: Uploader from env: {uploader}"
  316. else:
  317. expected_output_lines.extend(
  318. self.expected_debug_chatter.split("\n")[:-1])
  319. if hasattr(self, 'expected_system_uploader'):
  320. guess_line_template = "D: Guessed uploader: {uploader}"
  321. if hasattr(self, 'expected_uploader'):
  322. expected_output_lines.append(guess_line_template.format(
  323. uploader=self.expected_uploader))
  324. expected_output = "\n".join(expected_output_lines)
  325. self.assertIn(expected_output, sys.stdout.getvalue())
  326. class getoptions_ParseCommandLineTestCase(
  327. testscenarios.WithScenarios,
  328. getoptions_TestCase):
  329. """ Test cases for `getoptions` function, parsing command line. """
  330. dcut_usage_message = "Lorem ipsum, dolor sit amet."
  331. progname = "lorem"
  332. dcut_version = "ipsum"
  333. config_file_path = tempfile.mktemp()
  334. changes_file_path = tempfile.mktemp()
  335. output_file_path = tempfile.mktemp()
  336. upload_file_path = tempfile.mktemp()
  337. default_options = dict()
  338. default_options.update(
  339. (key, None) for key in [
  340. 'config', 'host', 'uploader', 'keyid',
  341. 'filetocreate', 'filetoupload', 'changes'])
  342. default_options.update(
  343. (key, False) for key in ['debug', 'simulate', 'passive'])
  344. option_scenarios = [
  345. ('no-options', {
  346. 'getopt_opts': [],
  347. }),
  348. ('option-bogus', {
  349. 'getopt_opts': [("--b0gUs", "BOGUS")],
  350. 'expected_stderr_output': (
  351. "{progname} internal error:"
  352. " Option --b0gUs, argument BOGUS unknown").format(
  353. progname=progname),
  354. 'expected_exit_status': EXIT_STATUS_FAILURE,
  355. }),
  356. ('option-help', {
  357. 'getopt_opts': [("--help", None)],
  358. 'expected_stdout_output': dcut_usage_message,
  359. 'expected_exit_status': EXIT_STATUS_SUCCESS,
  360. }),
  361. ('option-version', {
  362. 'getopt_opts': [("--version", None)],
  363. 'expected_stdout_output': " ".join(
  364. [progname, dcut_version]),
  365. 'expected_exit_status': EXIT_STATUS_SUCCESS,
  366. }),
  367. ('option-filetoupload-and-environ-uploader', {
  368. 'os_environ': {
  369. 'DEBEMAIL': "flup@example.org",
  370. 'DEBFULLNAME': "Lorem Ipsum",
  371. },
  372. 'getopt_opts': [
  373. ("--upload", upload_file_path),
  374. ],
  375. 'expected_options': {
  376. 'uploader': "Lorem Ipsum <flup@example.org>",
  377. 'filetoupload': upload_file_path,
  378. },
  379. 'expected_arguments': [],
  380. }),
  381. ('option-changes-and-environ-uploader', {
  382. 'os_environ': {
  383. 'DEBEMAIL': "flup@example.org",
  384. 'DEBFULLNAME': "Lorem Ipsum",
  385. },
  386. 'getopt_opts': [
  387. ("--input", changes_file_path),
  388. ],
  389. 'expected_options': {
  390. 'uploader': "Lorem Ipsum <flup@example.org>",
  391. 'changes': changes_file_path,
  392. },
  393. 'expected_arguments': [],
  394. }),
  395. ('option-filetoupload-and-option-maintaineraddress', {
  396. 'getopt_opts': [
  397. ("--upload", upload_file_path),
  398. ("--maintaineraddress", "Lorem Ipsum <flup@example.org>"),
  399. ],
  400. 'expected_options': {
  401. 'uploader': "Lorem Ipsum <flup@example.org>",
  402. 'filetoupload': upload_file_path,
  403. },
  404. 'expected_arguments': [],
  405. }),
  406. ('option-changes-and-option-maintaineraddress', {
  407. 'getopt_opts': [
  408. ("--input", changes_file_path),
  409. ("--maintaineraddress", "Lorem Ipsum <flup@example.org>"),
  410. ],
  411. 'expected_options': {
  412. 'uploader': "Lorem Ipsum <flup@example.org>",
  413. 'changes': changes_file_path,
  414. },
  415. 'expected_arguments': [],
  416. }),
  417. ('option-filetoupload-with-no-uploader', {
  418. 'getopt_opts': [("--upload", upload_file_path)],
  419. 'expected_stderr_output': "command file cannot be created",
  420. 'expected_exit_status': EXIT_STATUS_FAILURE,
  421. }),
  422. ('option-changes-with-no-uploader', {
  423. 'getopt_opts': [("--input", changes_file_path)],
  424. 'expected_stderr_output': "command file cannot be created",
  425. 'expected_exit_status': EXIT_STATUS_FAILURE,
  426. }),
  427. ('option-several', {
  428. 'getopt_opts': [
  429. ("--debug", None),
  430. ("--simulate", None),
  431. ("--config", config_file_path),
  432. ("--maintaineraddress", "Lorem Ipsum <flup@example.org>"),
  433. ("--keyid", "DEADBEEF"),
  434. ("--passive", None),
  435. ("--output", output_file_path),
  436. ("--host", "quux.example.com"),
  437. ],
  438. 'expected_options': {
  439. 'debug': True,
  440. 'simulate': True,
  441. 'config': config_file_path,
  442. 'uploader': "Lorem Ipsum <flup@example.org>",
  443. 'keyid': "DEADBEEF",
  444. 'passive': True,
  445. 'filetocreate': output_file_path,
  446. 'host': "quux.example.com",
  447. },
  448. 'expected_arguments': [],
  449. }),
  450. ]
  451. scenarios = option_scenarios
  452. def test_emits_debug_message_for_program_version(self):
  453. """ Should emit debug message for program version. """
  454. sys.argv.insert(1, "--debug")
  455. expected_progname = self.progname
  456. expected_version = self.dcut_version
  457. try:
  458. dput.dcut.getoptions()
  459. except FakeSystemExit:
  460. pass
  461. expected_output = textwrap.dedent("""\
  462. D: {progname} {version}
  463. """).format(
  464. progname=expected_progname,
  465. version=expected_version)
  466. self.assertIn(expected_output, sys.stdout.getvalue())
  467. def test_calls_getopt_with_expected_args(self):
  468. """ Should call `getopt` with expected arguments. """
  469. try:
  470. dput.dcut.getoptions()
  471. except FakeSystemExit:
  472. pass
  473. dputhelper.getopt.assert_called_with(
  474. self.sys_argv[1:], mock.ANY, mock.ANY)
  475. def test_emits_debug_message_for_each_option(self):
  476. """ Should emit a debug message for each option processed. """
  477. sys.argv.insert(1, "--debug")
  478. try:
  479. dput.dcut.getoptions()
  480. except FakeSystemExit:
  481. pass
  482. expected_output_lines = [
  483. "D: processing arg \"{opt}\", option \"{arg}\"".format(
  484. opt=option, arg=option_argument)
  485. for (option, option_argument) in self.getopt_args]
  486. expected_output = "\n".join(expected_output_lines)
  487. self.assertIn(expected_output, sys.stdout.getvalue())
  488. def test_emits_expected_message(self):
  489. """ Should emit message with expected content. """
  490. try:
  491. dput.dcut.getoptions()
  492. except FakeSystemExit:
  493. pass
  494. if hasattr(self, 'expected_stdout_output'):
  495. self.assertIn(self.expected_stdout_output, sys.stdout.getvalue())
  496. if hasattr(self, 'expected_stderr_output'):
  497. self.assertIn(self.expected_stderr_output, sys.stderr.getvalue())
  498. def test_calls_sys_exit_with_expected_exit_status(self):
  499. """ Should call `sys.exit` with expected exit status. """
  500. if not hasattr(self, 'expected_exit_status'):
  501. dput.dcut.getoptions()
  502. else:
  503. with testtools.ExpectedException(FakeSystemExit):
  504. dput.dcut.getoptions()
  505. sys.exit.assert_called_with(self.expected_exit_status)
  506. def test_returns_expected_values(self):
  507. """ Should return expected values. """
  508. if not hasattr(self, 'expected_result'):
  509. self.skipTest("No return result expected")
  510. result = dput.dcut.getoptions()
  511. self.assertEqual(self.expected_result, result)
  512. class getoptions_DetermineHostTestCase(
  513. testscenarios.WithScenarios,
  514. getoptions_TestCase):
  515. """ Test cases for `getoptions` function, determine host name. """
  516. system_scenarios = [
  517. ('domain-from-mailname-file', {
  518. 'mailname_fake_file': StringIO("consecteur.example.org"),
  519. }),
  520. ]
  521. default_options = getattr(
  522. getoptions_ParseCommandLineTestCase, 'default_options')
  523. command_scenarios = [
  524. ('no-opts no-args', {
  525. 'getopt_opts': [],
  526. 'getopt_args': [],
  527. 'expected_options': {
  528. 'host': None,
  529. },
  530. }),
  531. ('no-opts command-first-arg', {
  532. 'getopt_opts': [],
  533. 'getopt_args': ["cancel"],
  534. 'expected_options': {
  535. 'host': None,
  536. },
  537. 'expected_arguments': ["cancel"],
  538. }),
  539. ('no-opts host-first-arg', {
  540. 'getopt_opts': [],
  541. 'getopt_args': ["quux.example.com", "cancel"],
  542. 'expected_options': {
  543. 'host': "quux.example.com",
  544. },
  545. 'expected_arguments': ["cancel"],
  546. 'expected_debug_output': textwrap.dedent("""\
  547. D: first argument "quux.example.com" treated as host
  548. """),
  549. }),
  550. ('option-host host-first-arg', {
  551. 'getopt_opts': [("--host", "quux.example.com")],
  552. 'getopt_args': ["decoy.example.net", "cancel"],
  553. 'expected_options': {
  554. 'host': "quux.example.com",
  555. },
  556. 'expected_arguments': ["decoy.example.net", "cancel"],
  557. }),
  558. ]
  559. scenarios = testscenarios.multiply_scenarios(
  560. system_scenarios, command_scenarios)
  561. def test_emits_expected_debug_message(self):
  562. """ Should emit the expected debug message. """
  563. if not hasattr(self, 'expected_debug_output'):
  564. self.expected_debug_output = ""
  565. self.getopt_opts = list(
  566. self.getopt_opts + [("--debug", None)])
  567. dput.dcut.getoptions()
  568. self.assertIn(self.expected_debug_output, sys.stdout.getvalue())
  569. def test_returns_expected_values(self):
  570. """ Should return expected values. """
  571. if not hasattr(self, 'expected_result'):
  572. self.skipTest("No return result expected")
  573. (options, arguments) = dput.dcut.getoptions()
  574. self.assertEqual(self.expected_options['host'], options['host'])
  575. self.assertEqual(self.expected_arguments, arguments)
  576. class parse_queuecommands_TestCase(testtools.TestCase):
  577. """ Base for test cases for `parse_queuecommands` function. """
  578. scenarios = NotImplemented
  579. def setUp(self):
  580. """ Set up test fixtures. """
  581. super(parse_queuecommands_TestCase, self).setUp()
  582. patch_system_interfaces(self)
  583. self.set_test_args()
  584. def set_test_args(self):
  585. """ Set the arguments for the test call to the function. """
  586. default_options = {
  587. 'debug': False,
  588. }
  589. self.test_args = dict(
  590. arguments=getattr(self, 'arguments', []),
  591. options=getattr(self, 'options', default_options),
  592. config=object(),
  593. )
  594. class parse_queuecommands_SuccessTestCase(
  595. testscenarios.WithScenarios,
  596. parse_queuecommands_TestCase):
  597. """ Success test cases for `parse_queuecommands` function. """
  598. scenarios = [
  599. ('one-command-rm', {
  600. 'arguments': ["rm", "lorem.deb"],
  601. 'expected_commands': [
  602. "rm --searchdirs lorem.deb",
  603. ],
  604. }),
  605. ('one-command-rm nosearchdirs', {
  606. 'arguments': ["rm", "--nosearchdirs", "lorem.deb"],
  607. 'expected_commands': [
  608. "rm lorem.deb",
  609. ],
  610. }),
  611. ('one-command-cancel', {
  612. 'arguments': ["cancel", "lorem.deb"],
  613. 'expected_commands': [
  614. "cancel lorem.deb",
  615. ],
  616. }),
  617. ('one-command-cancel nosearchdirs', {
  618. 'arguments': ["cancel", "--nosearchdirs", "lorem.deb"],
  619. 'expected_commands': [
  620. "cancel --nosearchdirs lorem.deb",
  621. ],
  622. }),
  623. ('one-command-reschedule', {
  624. 'arguments': ["reschedule", "lorem.deb"],
  625. 'expected_commands': [
  626. "reschedule lorem.deb",
  627. ],
  628. }),
  629. ('one-command-reschedule nosearchdirs', {
  630. 'arguments': ["reschedule", "--nosearchdirs", "lorem.deb"],
  631. 'expected_commands': [
  632. "reschedule --nosearchdirs lorem.deb",
  633. ],
  634. }),
  635. ('three-commands comma-separated', {
  636. 'arguments': [
  637. "rm", "foo", ",",
  638. "cancel", "bar", ",",
  639. "reschedule", "baz"],
  640. 'expected_commands': [
  641. "rm --searchdirs foo ",
  642. "cancel bar ",
  643. "reschedule baz",
  644. ],
  645. }),
  646. ('three-commands semicolon-separated', {
  647. 'arguments': [
  648. "rm", "foo", ";",
  649. "cancel", "bar", ";",
  650. "reschedule", "baz"],
  651. 'expected_commands': [
  652. "rm --searchdirs foo ",
  653. "cancel bar ",
  654. "reschedule baz",
  655. ],
  656. }),
  657. ]
  658. def test_emits_debug_message_for_each_command(self):
  659. """ Should emit a debug message for each command. """
  660. self.test_args['options'] = dict(self.test_args['options'])
  661. self.test_args['options']['debug'] = True
  662. dput.dcut.parse_queuecommands(**self.test_args)
  663. expected_output = "\n".join(
  664. "D: Successfully parsed command \"{command}\"".format(
  665. command=command)
  666. for command in self.expected_commands)
  667. self.assertIn(expected_output, sys.stdout.getvalue())
  668. def test_returns_expected_commands(self):
  669. """ Should return expected commands value. """
  670. result = dput.dcut.parse_queuecommands(**self.test_args)
  671. self.assertEqual(self.expected_commands, result)
  672. class parse_queuecommands_ErrorTestCase(
  673. testscenarios.WithScenarios,
  674. parse_queuecommands_TestCase):
  675. """ Error test cases for `parse_queuecommands` function. """
  676. scenarios = [
  677. ('no-arguments', {
  678. 'arguments': [],
  679. 'expected_debug_output': textwrap.dedent("""\
  680. Error: no arguments given, see dcut -h
  681. """),
  682. 'expected_exit_status': EXIT_STATUS_FAILURE,
  683. }),
  684. ('first-command-bogus', {
  685. 'arguments': ["b0gUs", "spam", "eggs"],
  686. 'expected_debug_output': textwrap.dedent("""\
  687. Error: Could not parse commands at "b0gUs"
  688. """),
  689. 'expected_exit_status': EXIT_STATUS_FAILURE,
  690. }),
  691. ('third-command-bogus', {
  692. 'arguments': ["rm", "foo", ",", "cancel", "bar", ",", "b0gUs"],
  693. 'expected_debug_output': textwrap.dedent("""\
  694. Error: Could not parse commands at "b0gUs"
  695. """),
  696. 'expected_exit_status': EXIT_STATUS_FAILURE,
  697. }),
  698. ]
  699. def test_emits_expected_error_message(self):
  700. """ Should emit expected error message. """
  701. try:
  702. dput.dcut.parse_queuecommands(**self.test_args)
  703. except FakeSystemExit:
  704. pass
  705. self.assertIn(self.expected_debug_output, sys.stderr.getvalue())
  706. def test_calls_sys_exit_with_exit_status(self):
  707. """ Should call `sys.exit` with expected exit status. """
  708. with testtools.ExpectedException(FakeSystemExit):
  709. dput.dcut.parse_queuecommands(**self.test_args)
  710. sys.exit.assert_called_with(self.expected_exit_status)
  711. class create_commands_TestCase(
  712. testtools.TestCase):
  713. """ Test cases for `create_commands` function. """
  714. def setUp(self):
  715. """ Set up test fixtures. """
  716. super(create_commands_TestCase, self).setUp()
  717. patch_system_interfaces(self)
  718. self.changes_file_scenarios = make_changes_file_scenarios()
  719. set_changes_file_scenario(self, 'no-format')
  720. setup_file_double_behaviour(self)
  721. self.set_expected_commands()
  722. self.set_options()
  723. test_dput_main.patch_parse_changes(self)
  724. dput.dput.parse_changes.return_value = self.changes_file_scenario[
  725. 'expected_result']
  726. self.set_test_args()
  727. def set_options(self):
  728. """ Set the options mapping to pass to the function. """
  729. self.options = {
  730. 'debug': False,
  731. 'changes': self.changes_file_double.path,
  732. }
  733. def set_test_args(self):
  734. """ Set the arguments for the test call to the function. """
  735. self.test_args = dict(
  736. options=dict(self.options),
  737. config=object(),
  738. parse_changes=dput.dput.parse_changes,
  739. )
  740. def set_expected_commands(self):
  741. """ Set the expected commands for this test case. """
  742. files_to_remove = [os.path.basename(self.changes_file_double.path)]
  743. files_from_changes = self.changes_file_scenario[
  744. 'expected_result']['files']
  745. for line in files_from_changes.split("\n"):
  746. files_to_remove.append(line.split(" ")[4])
  747. self.expected_commands = [
  748. "rm --searchdirs {path}".format(path=path)
  749. for path in files_to_remove]
  750. def test_emits_debug_message_for_changes_file(self):
  751. """ Should emit debug message for changes file. """
  752. self.options['debug'] = True
  753. self.set_test_args()
  754. dput.dcut.create_commands(**self.test_args)
  755. expected_output = textwrap.dedent("""\
  756. D: Parsing changes file ({path}) for files to remove
  757. """).format(path=self.changes_file_double.path)
  758. self.assertIn(expected_output, sys.stdout.getvalue())
  759. def test_emits_error_message_when_changes_file_open_error(self):
  760. """ Should emit error message when changes file raises error. """
  761. self.changes_file_double.set_open_scenario('read_denied')
  762. try:
  763. dput.dcut.create_commands(**self.test_args)
  764. except FakeSystemExit:
  765. pass
  766. expected_output = textwrap.dedent("""\
  767. Can't open changes file: {path}
  768. """).format(path=self.changes_file_double.path)
  769. self.assertIn(expected_output, sys.stdout.getvalue())
  770. def test_calls_sys_exit_when_changes_file_open_error(self):
  771. """ Should call `sys.exit` when changes file raises error. """
  772. self.changes_file_double.set_open_scenario('read_denied')
  773. with testtools.ExpectedException(FakeSystemExit):
  774. dput.dcut.create_commands(**self.test_args)
  775. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  776. def test_returns_expected_result(self):
  777. """ Should return expected result. """
  778. result = dput.dcut.create_commands(**self.test_args)
  779. self.assertEqual(self.expected_commands, result)
  780. class write_commands_TestCase(
  781. testscenarios.WithScenarios,
  782. testtools.TestCase):
  783. """ Test cases for `write_commands` function. """
  784. default_options = {
  785. 'filetocreate': None,
  786. }
  787. path_scenarios = [
  788. ('default-path', {}),
  789. ('filetocreate', {
  790. 'option_filetocreate': str("ipsum.commands"),
  791. 'expected_result': "ipsum.commands",
  792. }),
  793. ('no-tempdir', {
  794. 'tempdir': None,
  795. }),
  796. ]
  797. commands_scenarios = [
  798. ('commands-none', {
  799. 'commands': [],
  800. }),
  801. ('commands-one', {
  802. 'commands': ["foo"],
  803. }),
  804. ('commands-three', {
  805. 'commands': ["foo", "bar", "baz"],
  806. }),
  807. ]
  808. keyid_scenarios = [
  809. ('keyid-none', {}),
  810. ('keyid-set', {
  811. 'option_keyid': "DEADBEEF",
  812. }),
  813. ]
  814. scenarios = testscenarios.multiply_scenarios(
  815. path_scenarios, commands_scenarios, keyid_scenarios)
  816. for (scenario_name, scenario) in scenarios:
  817. default_options = getattr(
  818. getoptions_ParseCommandLineTestCase, 'default_options')
  819. options = dict(default_options)
  820. options.update({
  821. 'uploader': "Lorem Ipsum <flup@example.org>",
  822. })
  823. scenario['uploader_filename_part'] = "Lorem_Ipsum__flup_example_org_"
  824. if 'option_filetocreate' in scenario:
  825. options['filetocreate'] = scenario['option_filetocreate']
  826. if 'option_keyid' in scenario:
  827. options['keyid'] = scenario['option_keyid']
  828. scenario['options'] = options
  829. if 'tempdir' not in scenario:
  830. scenario['tempdir'] = tempfile.mktemp()
  831. del scenario_name, scenario
  832. del default_options, options
  833. def setUp(self):
  834. """ Set up test fixtures. """
  835. super(write_commands_TestCase, self).setUp()
  836. patch_system_interfaces(self)
  837. patch_os_getpid(self)
  838. os.getpid.return_value = self.getUniqueInteger()
  839. self.time_return_value = self.getUniqueInteger()
  840. patch_time_time(self, itertools.repeat(self.time_return_value))
  841. patch_sys_argv(self)
  842. self.set_commands_file_double()
  843. setup_file_double_behaviour(self)
  844. self.set_expected_result()
  845. self.set_commands()
  846. self.set_test_args()
  847. patch_subprocess_popen(self)
  848. patch_os_waitpid(self)
  849. self.set_debsign_subprocess_double()
  850. setup_subprocess_double_behaviour(self)
  851. def set_options(self):
  852. """ Set the options mapping to pass to the function. """
  853. def set_test_args(self):
  854. """ Set the arguments for the test call to the function. """
  855. self.test_args = dict(
  856. commands=list(self.commands),
  857. options=dict(self.options),
  858. config=object(),
  859. tempdir=self.tempdir,
  860. )
  861. def make_commands_filename(self):
  862. """ Make the filename for the commands output file. """
  863. expected_progname = self.progname
  864. filename = "{progname}.{uploadpart}.{time:d}.{pid:d}.commands".format(
  865. progname=expected_progname,
  866. uploadpart=self.uploader_filename_part,
  867. time=self.time_return_value,
  868. pid=os.getpid.return_value)
  869. return filename
  870. def set_commands_file_double(self):
  871. """ Set the commands file double for this test case. """
  872. if self.options['filetocreate']:
  873. path = self.options['filetocreate']
  874. else:
  875. output_filename = self.make_commands_filename()
  876. if self.tempdir:
  877. path = os.path.join(self.tempdir, output_filename)
  878. else:
  879. path = output_filename
  880. double = FileDouble(path)
  881. double.register_for_testcase(self)
  882. self.commands_file_double = double
  883. def set_expected_result(self):
  884. """ Set the `expected_result` for this test case. """
  885. self.expected_result = self.commands_file_double.path
  886. def set_commands(self):
  887. """ Set the commands to use for this test case. """
  888. if not hasattr(self, 'commands'):
  889. self.commands = []
  890. def make_expected_content(self):
  891. """ Make the expected content for the output file. """
  892. uploader_value = self.options['uploader']
  893. if self.commands:
  894. commands_value = "\n".join(
  895. " {command}".format(command=command)
  896. for command in self.commands)
  897. else:
  898. commands_value = " "
  899. commands_value += "\n"
  900. text = textwrap.dedent("""\
  901. Uploader: {uploader}
  902. Commands:
  903. {commands}
  904. """).format(uploader=uploader_value, commands=commands_value)
  905. return text
  906. def set_debsign_subprocess_double(self):
  907. """ Set the ‘debsign’ subprocess double for this test case. """
  908. path = "/usr/bin/debsign"
  909. argv = [os.path.basename(path), ARG_MORE]
  910. double = SubprocessDouble(path, argv)
  911. double.register_for_testcase(self)
  912. self.debsign_subprocess_double = double
  913. def make_expected_debsign_argv(self):
  914. """ Make the expected command-line arguments for ‘debsign’. """
  915. argv = [
  916. str("debsign"),
  917. "-m{uploader}".format(uploader=self.options['uploader']),
  918. ]
  919. if self.options['keyid']:
  920. argv.append(
  921. "-k{keyid}".format(keyid=self.options['keyid']))
  922. argv.append(self.commands_file_double.path)
  923. return argv
  924. def test_returns_expected_file_path(self):
  925. """ Should return expected file path. """
  926. result = dput.dcut.write_commands(**self.test_args)
  927. self.assertEqual(self.expected_result, result)
  928. def test_output_file_has_expected_content(self):
  929. """ Should have expected content in output file. """
  930. with mock.patch.object(self.commands_file_double.fake_file, 'close'):
  931. dput.dcut.write_commands(**self.test_args)
  932. expected_value = self.make_expected_content()
  933. self.assertEqual(
  934. expected_value, self.commands_file_double.fake_file.getvalue())
  935. def test_emits_debug_message_for_debsign(self):
  936. """ Should emit debug message for ‘debsign’ command. """
  937. self.options['debug'] = True
  938. self.test_args['options'] = self.options
  939. dput.dcut.write_commands(**self.test_args)
  940. debsign_argv = self.make_expected_debsign_argv()
  941. expected_output = textwrap.dedent("""\
  942. D: calling debsign: {argv}
  943. """).format(argv=debsign_argv)
  944. self.assertIn(expected_output, sys.stdout.getvalue())
  945. def test_invokes_debsign_with_expected_args(self):
  946. """ Should invoke ‘debsign’ command with expected args. """
  947. debsign_argv = self.make_expected_debsign_argv()
  948. expected_args = (debsign_argv,)
  949. dput.dcut.write_commands(**self.test_args)
  950. subprocess.Popen.assert_called_with(*expected_args)
  951. def test_calls_os_waitpid_with_expected_args(self):
  952. """ Should call `os.waitpid` with expected args. """
  953. expected_args = (self.debsign_subprocess_double.pid, 0)
  954. dput.dcut.write_commands(**self.test_args)
  955. os.waitpid.assert_called_with(*expected_args)
  956. def test_emits_error_message_when_debsign_failure(self):
  957. """ Should emit error message when ‘debsign’ command failure. """
  958. self.debsign_subprocess_double.set_os_waitpid_scenario('failure')
  959. try:
  960. dput.dcut.write_commands(**self.test_args)
  961. except FakeSystemExit:
  962. pass
  963. expected_output = textwrap.dedent("""\
  964. Error: debsign failed.
  965. """)
  966. self.assertIn(expected_output, sys.stderr.getvalue())
  967. def test_calls_sys_exit_when_debsign_failure(self):
  968. """ Should call `sys.exit` when ‘debsign’ command failure. """
  969. self.debsign_subprocess_double.set_os_waitpid_scenario('failure')
  970. with testtools.ExpectedException(FakeSystemExit):
  971. dput.dcut.write_commands(**self.test_args)
  972. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  973. class upload_TestCase(test_dput_main.main_TestCase):
  974. """ Base for test cases for `upload_stolen_from_dput_main` function. """
  975. function_to_test = staticmethod(dput.dcut.upload_stolen_from_dput_main)
  976. def setUp(self):
  977. """ Set up test fixtures. """
  978. super(upload_TestCase, self).setUp()
  979. self.set_cat_subprocess_double()
  980. patch_os_system(self)
  981. patch_tempfile_mkdtemp(self)
  982. patch_os_rmdir(self)
  983. patch_getoptions(self)
  984. def patch_globals(self):
  985. # The `upload_stolen_from_dput_main` function doesn't need these. """
  986. pass
  987. def set_cat_subprocess_double(self):
  988. """ Set the ‘cat’ subprocess double for this test case. """
  989. path = "/bin/cat"
  990. argv = [os.path.basename(path), ARG_ANY]
  991. double = SubprocessDouble(path, argv)
  992. double.register_for_testcase(self)
  993. double.set_os_system_scenario('success')
  994. self.cat_subprocess_double = double
  995. def set_test_args(self):
  996. """ Set the arguments for the test call to the function. """
  997. self.test_args = dict(
  998. host=self.test_host,
  999. upload_methods=self.upload_methods,
  1000. config=self.runtime_config_parser,
  1001. debug=False,
  1002. simulate=False,
  1003. files_to_upload=self.files_to_upload,
  1004. ftp_passive_mode=False,
  1005. )
  1006. if hasattr(self, 'test_args_extra'):
  1007. self.test_args.update(self.test_args_extra)
  1008. def get_upload_method_func(self):
  1009. """ Get the specified upload method. """
  1010. method_name = self.runtime_config_parser.get(self.test_host, 'method')
  1011. method_func = dput.dput.upload_methods[method_name]
  1012. return method_func
  1013. class upload_DebugMessageTestCase(upload_TestCase):
  1014. """ Test cases for `upload_stolen_from_dput_main` debug messages. """
  1015. def test_emits_debug_message_for_discovered_methods(self):
  1016. """ Should emit debug message for discovered upload methods. """
  1017. self.test_args['debug'] = True
  1018. self.function_to_test(**self.test_args)
  1019. expected_output = textwrap.dedent("""\
  1020. D: Default Method: {default_method}
  1021. D: Host Method: {host_method}
  1022. """).format(
  1023. default_method=self.runtime_config_parser.get(
  1024. 'DEFAULT', 'method'),
  1025. host_method=self.runtime_config_parser.get(
  1026. self.test_host, 'method'))
  1027. self.assertIn(expected_output, sys.stdout.getvalue())
  1028. class upload_UnknownUploadMethodTestCase(
  1029. testscenarios.WithScenarios,
  1030. upload_TestCase):
  1031. """ Test cases for `upload_stolen_from_dput_main`, unknown method. """
  1032. scenarios = [
  1033. ('bogus-default-method', {
  1034. 'config_extras': {
  1035. 'default': {
  1036. 'method': "b0gUs",
  1037. },
  1038. },
  1039. 'expected_output': "Unknown upload method: b0gUs",
  1040. 'expected_exit_status': EXIT_STATUS_FAILURE,
  1041. }),
  1042. ('bogus-host-method', {
  1043. 'config_extras': {
  1044. 'host': {
  1045. 'method': "b0gUs",
  1046. },
  1047. },
  1048. 'expected_output': "Unknown upload method: b0gUs",
  1049. 'expected_exit_status': EXIT_STATUS_FAILURE,
  1050. }),
  1051. ]
  1052. def test_emits_error_message_when_unknown_method(self):
  1053. """ Should emit error message when unknown upload method. """
  1054. try:
  1055. self.function_to_test(**self.test_args)
  1056. except FakeSystemExit:
  1057. pass
  1058. self.assertIn(self.expected_output, sys.stderr.getvalue())
  1059. def test_calls_sys_exit_when_unknown_method(self):
  1060. """ Should call `sys.exit` when unknown upload method. """
  1061. with testtools.ExpectedException(FakeSystemExit):
  1062. self.function_to_test(**self.test_args)
  1063. sys.exit.assert_called_with(self.expected_exit_status)
  1064. class upload_DiscoverLoginTestCase(
  1065. testscenarios.WithScenarios,
  1066. upload_TestCase):
  1067. """ Test cases for `upload_stolen_from_dput_main` discovery of login. """
  1068. fallback_login_scenarios = [
  1069. ('login-from-environ', {
  1070. 'os_environ': {
  1071. 'USER': "login-from-environ",
  1072. },
  1073. 'expected_fallback_login': "login-from-environ",
  1074. 'expected_system_uid_debug_message': "",
  1075. }),
  1076. ('login-from-pwd', {
  1077. 'os_getuid_return_value': 42,
  1078. 'pwd_getpwuid_return_value': PasswdEntry(
  1079. *(["login-from-pwd"] + [object()] * 6)),
  1080. 'expected_fallback_login': "login-from-pwd",
  1081. 'expected_system_uid_debug_message': "D: User-ID: 42",
  1082. }),
  1083. ]
  1084. config_login_scenarios = [
  1085. ('config-default-login', {
  1086. 'config_extras': {
  1087. 'default': {
  1088. 'login': "login-from-config-default",
  1089. },
  1090. },
  1091. 'expected_login': "login-from-config-default",
  1092. 'expected_output_template':
  1093. "D: Login to use: {login}",
  1094. }),
  1095. ('config-host-login', {
  1096. 'config_extras': {
  1097. 'host': {
  1098. 'login': "login-from-config-host",
  1099. },
  1100. },
  1101. 'expected_login': "login-from-config-host",
  1102. 'expected_output_template':
  1103. "D: Login to use: {login}",
  1104. }),
  1105. ('config-default-login sentinel', {
  1106. 'config_extras': {
  1107. 'default': {
  1108. 'login': "username",
  1109. },
  1110. },
  1111. 'expected_output_template': (
  1112. "D: Neither host {host} nor default login used."
  1113. " Using {login}"),
  1114. }),
  1115. ('config-host-login sentinel', {
  1116. 'config_extras': {
  1117. 'host': {
  1118. 'login': "username",
  1119. },
  1120. },
  1121. 'expected_output_template': (
  1122. "D: Neither host {host} nor default login used."
  1123. " Using {login}"),
  1124. }),
  1125. ]
  1126. scenarios = testscenarios.multiply_scenarios(
  1127. fallback_login_scenarios, config_login_scenarios)
  1128. for (scenario_name, scenario) in scenarios:
  1129. if 'expected_login' not in scenario:
  1130. scenario['expected_login'] = scenario['expected_fallback_login']
  1131. del scenario_name, scenario
  1132. def test_emits_debug_message_for_system_uid(self):
  1133. """ Should emit a debug message for the system UID. """
  1134. if self.expected_login != self.expected_fallback_login:
  1135. self.skipTest("No fallback in this scenario")
  1136. self.test_args['debug'] = True
  1137. self.function_to_test(**self.test_args)
  1138. expected_output = self.expected_system_uid_debug_message.format(
  1139. uid=self.os_getuid_return_value)
  1140. self.assertIn(expected_output, sys.stdout.getvalue())
  1141. def test_emits_debug_message_for_discovered_login(self):
  1142. """ Should emit a debug message for the discovered login. """
  1143. self.test_args['debug'] = True
  1144. self.function_to_test(**self.test_args)
  1145. expected_output = self.expected_output_template.format(
  1146. login=self.expected_login, host=self.test_host)
  1147. self.assertIn(expected_output, sys.stdout.getvalue())
  1148. def test_calls_upload_method_with_expected_login(self):
  1149. """ Should call upload method function with expected login arg. """
  1150. upload_method_func = get_upload_method_func(self)
  1151. self.function_to_test(**self.test_args)
  1152. upload_method_func.assert_called_with(
  1153. mock.ANY, self.expected_login,
  1154. mock.ANY, mock.ANY, mock.ANY, mock.ANY)
  1155. class upload_SimulateTestCase(
  1156. testscenarios.WithScenarios,
  1157. upload_TestCase):
  1158. """ Test cases for `upload_stolen_from_dput_main`, ‘simulate’ option. """
  1159. scenarios = [
  1160. ('simulate', {
  1161. 'config_default_login': "login-from-config-default",
  1162. 'test_args_extra': {
  1163. 'simulate': True,
  1164. },
  1165. }),
  1166. ('simulate three-files', {
  1167. 'config_default_login': "login-from-config-default",
  1168. 'test_args_extra': {
  1169. 'simulate': True,
  1170. },
  1171. 'files_to_upload': [tempfile.mktemp() for __ in range(3)],
  1172. }),
  1173. ]
  1174. def test_omits_upload_method(self):
  1175. """ Should omit call to upload method function. """
  1176. upload_method_func = get_upload_method_func(self)
  1177. self.function_to_test(**self.test_args)
  1178. self.assertFalse(upload_method_func.called)
  1179. def test_emits_message_for_each_file_to_upload(self):
  1180. """ Should emit a message for each file to upload. """
  1181. self.function_to_test(**self.test_args)
  1182. method = self.runtime_config_parser.get(self.test_host, 'method')
  1183. fqdn = self.runtime_config_parser.get(self.test_host, 'fqdn')
  1184. incoming = self.runtime_config_parser.get(self.test_host, 'incoming')
  1185. expected_output = "\n".join(
  1186. "Uploading with {method}: {path} to {fqdn}:{incoming}".format(
  1187. method=method, path=path,
  1188. fqdn=fqdn, incoming=incoming)
  1189. for path in self.files_to_upload)
  1190. self.assertIn(expected_output, sys.stderr.getvalue())
  1191. def test_calls_cat_for_each_file_to_upload(self):
  1192. """ Should call ‘cat’ for each file to upload. """
  1193. self.function_to_test(**self.test_args)
  1194. expected_calls = [
  1195. mock.call("cat {path}".format(path=path))
  1196. for path in self.files_to_upload]
  1197. os.system.assert_has_calls(expected_calls, any_order=True)
  1198. class upload_UploadMethodTestCase(
  1199. testscenarios.WithScenarios,
  1200. upload_TestCase):
  1201. """ Test cases for `upload_stolen_from_dput_main`, invoking method. """
  1202. method_scenarios = [
  1203. ('method-local', {
  1204. 'config_method': "local",
  1205. 'config_progress_indicator': 23,
  1206. 'expected_args': (
  1207. "localhost", mock.ANY, mock.ANY, mock.ANY, mock.ANY,
  1208. 0),
  1209. }),
  1210. ('method-ftp', {
  1211. 'config_method': "ftp",
  1212. 'config_fqdn': "foo.example.com",
  1213. 'config_passive_ftp': False,
  1214. 'config_progress_indicator': 23,
  1215. 'expected_args': (
  1216. "foo.example.com", mock.ANY, mock.ANY, mock.ANY, mock.ANY,
  1217. False),
  1218. 'expected_stdout_output': "",
  1219. }),
  1220. ('method-ftp port-custom', {
  1221. 'config_method': "ftp",
  1222. 'config_fqdn': "foo.example.com:42",
  1223. 'config_passive_ftp': False,
  1224. 'config_progress_indicator': 23,
  1225. 'expected_args': (
  1226. "foo.example.com:42",
  1227. mock.ANY, mock.ANY, mock.ANY, mock.ANY,
  1228. False),
  1229. 'expected_stdout_output': "",
  1230. }),
  1231. ('method-ftp config-passive-mode', {
  1232. 'config_method': "ftp",
  1233. 'config_fqdn': "foo.example.com",
  1234. 'config_passive_ftp': True,
  1235. 'config_progress_indicator': 23,
  1236. 'expected_args': (
  1237. "foo.example.com", mock.ANY, mock.ANY, mock.ANY, mock.ANY,
  1238. True),
  1239. 'expected_stdout_output': "",
  1240. }),
  1241. ('method-ftp config-passive-mode arg-ftp-active-mode', {
  1242. 'config_method': "ftp",
  1243. 'config_fqdn': "foo.example.com",
  1244. 'config_passive_ftp': True,
  1245. 'config_progress_indicator': 23,
  1246. 'test_args_extra': {
  1247. 'ftp_passive_mode': False,
  1248. },
  1249. 'expected_args': (
  1250. "foo.example.com", mock.ANY, mock.ANY, mock.ANY, mock.ANY,
  1251. True),
  1252. 'expected_stdout_output': "D: Using active ftp",
  1253. }),
  1254. ('method-ftp arg-ftp-passive-mode', {
  1255. 'config_method': "ftp",
  1256. 'config_fqdn': "foo.example.com",
  1257. 'config_progress_indicator': 23,
  1258. 'test_args_extra': {
  1259. 'ftp_passive_mode': True,
  1260. },
  1261. 'expected_args': (
  1262. "foo.example.com", mock.ANY, mock.ANY, mock.ANY, mock.ANY,
  1263. True),
  1264. 'expected_stdout_output': "D: Using passive ftp",
  1265. }),
  1266. ('method-ftp config-passive-mode arg-ftp-passive-mode', {
  1267. 'config_method': "ftp",
  1268. 'config_fqdn': "foo.example.com",
  1269. 'config_passive_ftp': True,
  1270. 'config_progress_indicator': 23,
  1271. 'test_args_extra': {
  1272. 'ftp_passive_mode': True,
  1273. },
  1274. 'expected_args': (
  1275. "foo.example.com", mock.ANY, mock.ANY, mock.ANY, mock.ANY,
  1276. True),
  1277. 'expected_stdout_output': "D: Using passive ftp",
  1278. }),
  1279. ('method-scp', {
  1280. 'config_method': "scp",
  1281. 'config_fqdn': "foo.example.com",
  1282. 'expected_args': (
  1283. "foo.example.com", mock.ANY, mock.ANY, mock.ANY, mock.ANY,
  1284. False, []),
  1285. 'expected_stdout_output': "",
  1286. }),
  1287. ('method-scp scp-compress', {
  1288. 'config_method': "scp",
  1289. 'config_fqdn': "foo.example.com",
  1290. 'config_extras': {
  1291. 'host': {
  1292. 'scp_compress': "True",
  1293. 'ssh_config_options': "spam eggs beans",
  1294. },
  1295. },
  1296. 'expected_args': (
  1297. "foo.example.com", mock.ANY, mock.ANY, mock.ANY, mock.ANY,
  1298. True, ["spam eggs beans"]),
  1299. 'expected_stdout_output': "D: Setting compression for scp",
  1300. }),
  1301. ]
  1302. login_scenarios = [
  1303. ('default-login', {
  1304. 'config_default_login': "login-from-config-default",
  1305. }),
  1306. ]
  1307. commands_scenarios = [
  1308. ('commands-from-changes', {
  1309. 'getoptions_args': ["foo", "bar", "baz"],
  1310. 'getoptions_opts': {
  1311. 'filetocreate': None,
  1312. 'filetoupload': tempfile.mktemp() + "commands",
  1313. },
  1314. }),
  1315. ('commands-from-changes', {
  1316. 'getoptions_args': ["foo", "bar", "baz"],
  1317. 'getoptions_opts': {
  1318. 'filetocreate': None,
  1319. 'filetoupload': None,
  1320. 'changes': tempfile.mktemp(),
  1321. },
  1322. }),
  1323. ('commands-from-arguments', {
  1324. 'getoptions_args': ["foo", "bar", "baz"],
  1325. 'getoptions_opts': {
  1326. 'filetocreate': None,
  1327. 'filetoupload': None,
  1328. 'changes': None,
  1329. },
  1330. }),
  1331. ]
  1332. files_scenarios = [
  1333. ('no-files', {
  1334. 'files_to_remove': [],
  1335. }),
  1336. ('three-files', {
  1337. 'files_to_remove': [
  1338. tempfile.mktemp() for __ in range(3)],
  1339. }),
  1340. ]
  1341. scenarios = testscenarios.multiply_scenarios(
  1342. method_scenarios, login_scenarios,
  1343. commands_scenarios, files_scenarios)
  1344. def test_emits_expected_debug_message(self):
  1345. """ Should emit expected debug message. """
  1346. self.test_args['debug'] = True
  1347. self.function_to_test(**self.test_args)
  1348. if hasattr(self, 'expected_stdout_output'):
  1349. self.assertIn(self.expected_stdout_output, sys.stdout.getvalue())
  1350. def test_calls_upload_method_with_expected_args(self):
  1351. """ Should call upload method function with expected args. """
  1352. upload_method_func = get_upload_method_func(self)
  1353. self.function_to_test(**self.test_args)
  1354. upload_method_func.assert_called_with(*self.expected_args)
  1355. class dcut_TestCase(testtools.TestCase):
  1356. """ Base for test cases for `dput` function. """
  1357. def setUp(self):
  1358. """ Set up test fixtures. """
  1359. super(dcut_TestCase, self).setUp()
  1360. patch_system_interfaces(self)
  1361. patch_tempfile_mkdtemp(self)
  1362. patch_os_unlink(self)
  1363. patch_os_rmdir(self)
  1364. patch_shutil_rmtree(self)
  1365. set_config(
  1366. self,
  1367. getattr(self, 'config_scenario_name', 'exist-simple'))
  1368. test_dput_main.patch_runtime_config_options(self)
  1369. self.set_test_args()
  1370. patch_getoptions(self)
  1371. test_dput_main.patch_parse_changes(self)
  1372. test_dput_main.patch_read_configs(self)
  1373. test_dput_main.patch_upload_methods(self)
  1374. test_dput_main.patch_import_upload_functions(self)
  1375. self.patch_parse_queuecommands()
  1376. self.patch_create_commands()
  1377. self.patch_write_commands()
  1378. self.patch_upload_stolen_from_dput_main()
  1379. def set_test_args(self):
  1380. """ Set the arguments for the test call to the function. """
  1381. self.test_args = dict()
  1382. def patch_parse_queuecommands(self):
  1383. """ Patch the `parse_queuecommands` function for this test case. """
  1384. func_patcher = mock.patch.object(dput.dcut, "parse_queuecommands")
  1385. func_patcher.start()
  1386. self.addCleanup(func_patcher.stop)
  1387. def patch_create_commands(self):
  1388. """ Patch the `create_commands` function for this test case. """
  1389. func_patcher = mock.patch.object(dput.dcut, "create_commands")
  1390. func_patcher.start()
  1391. self.addCleanup(func_patcher.stop)
  1392. def patch_write_commands(self):
  1393. """ Patch the `write_commands` function for this test case. """
  1394. func_patcher = mock.patch.object(dput.dcut, "write_commands")
  1395. func_patcher.start()
  1396. self.addCleanup(func_patcher.stop)
  1397. def patch_upload_stolen_from_dput_main(self):
  1398. """ Patch `upload_stolen_from_dput_main` for this test case. """
  1399. func_patcher = mock.patch.object(
  1400. dput.dcut, "upload_stolen_from_dput_main")
  1401. func_patcher.start()
  1402. self.addCleanup(func_patcher.stop)
  1403. class dcut_DebugMessageTestCase(dcut_TestCase):
  1404. """ Test cases for `dcut` debug messages. """
  1405. def test_emits_debug_message_for_read_configs(self):
  1406. """ Should emit debug message for `read_configs` call. """
  1407. self.getoptions_opts['debug'] = True
  1408. dput.dcut.dcut(**self.test_args)
  1409. expected_output = textwrap.dedent("""\
  1410. D: calling dput.read_configs
  1411. """)
  1412. self.assertIn(expected_output, sys.stdout.getvalue())
  1413. class dcut_ConfigFileTestCase(
  1414. testscenarios.WithScenarios,
  1415. dcut_TestCase):
  1416. """ Test cases for `main` specification of configuration file. """
  1417. scenarios = [
  1418. ('default', {
  1419. 'expected_args': (None, mock.ANY),
  1420. }),
  1421. ('config-from-command-line', {
  1422. 'getoptions_opts': {
  1423. 'config': "lorem.conf",
  1424. },
  1425. 'expected_args': ("lorem.conf", mock.ANY),
  1426. }),
  1427. ]
  1428. def test_calls_read_configs_with_expected_args(self):
  1429. """ Should call `read_configs` with expected arguments. """
  1430. dput.dcut.dcut(**self.test_args)
  1431. dput.dput.read_configs.assert_called_with(*self.expected_args)
  1432. class dcut_OptionsErrorTestCase(
  1433. testscenarios.WithScenarios,
  1434. dcut_TestCase):
  1435. """ Test cases for `dcut` function, startup options cause error. """
  1436. scenarios = [
  1437. ('no-host-discovered', {
  1438. 'config_default_default_host_main': None,
  1439. 'getoptions_opts': {
  1440. 'host': None,
  1441. },
  1442. 'expected_output': (
  1443. "Error: No host specified"
  1444. " and no default found in config"),
  1445. 'expected_exit_status': EXIT_STATUS_FAILURE,
  1446. }),
  1447. ('host-not-in-config', {
  1448. 'config_scenario_name': "exist-minimal",
  1449. 'expected_output': "No host foo found in config",
  1450. 'expected_exit_status': EXIT_STATUS_FAILURE,
  1451. }),
  1452. ('no-allow-dcut', {
  1453. 'config_allow_dcut': False,
  1454. 'expected_output': (
  1455. "Error: dcut is not supported for this upload queue."),
  1456. 'expected_exit_status': EXIT_STATUS_FAILURE,
  1457. }),
  1458. ('filetoupload arguments', {
  1459. 'getoptions_opts': {
  1460. 'filetoupload': tempfile.mktemp() + ".commands",
  1461. },
  1462. 'getoptions_args': ["lorem", "ipsum", "dolor", "sit", "amet"],
  1463. 'expected_output': (
  1464. "Error: cannot take commands"
  1465. " when uploading existing file"),
  1466. 'expected_exit_status': EXIT_STATUS_FAILURE,
  1467. }),
  1468. ]
  1469. def test_emits_expected_error_message(self):
  1470. """ Should emit expected error message. """
  1471. try:
  1472. dput.dcut.dcut(**self.test_args)
  1473. except FakeSystemExit:
  1474. pass
  1475. self.assertIn(self.expected_output, sys.stdout.getvalue())
  1476. def test_calls_sys_exit_with_failure_exit_status(self):
  1477. """ Should call `sys.exit` with failure exit status. """
  1478. with testtools.ExpectedException(FakeSystemExit):
  1479. dput.dcut.dcut(**self.test_args)
  1480. sys.exit.assert_called_with(self.expected_exit_status)
  1481. class dcut_NamedHostTestCase(
  1482. testscenarios.WithScenarios,
  1483. dcut_TestCase):
  1484. """ Test cases for `dcut` function, named host processing. """
  1485. scenarios = [
  1486. ('host-from-command-line', {
  1487. 'config_scenario_name': "exist-simple-host-three",
  1488. 'config_default_default_host_main': "quux",
  1489. 'getoptions_opts': {
  1490. 'host': "bar",
  1491. },
  1492. 'expected_host': "bar",
  1493. 'expected_debug_output': "",
  1494. }),
  1495. ('host-from-config-default', {
  1496. 'config_scenario_name': "exist-simple-host-three",
  1497. 'config_default_default_host_main': "bar",
  1498. 'getoptions_opts': {
  1499. 'host': None,
  1500. },
  1501. 'expected_host': "bar",
  1502. 'expected_debug_output': textwrap.dedent("""\
  1503. D: Using host "bar" (default_host_main)
  1504. """),
  1505. }),
  1506. ('host-from-hardcoded-default', {
  1507. 'config_scenario_name': "exist-default-distribution-only",
  1508. 'config_default_default_host_main': "",
  1509. 'getoptions_opts': {
  1510. 'host': None,
  1511. },
  1512. 'expected_host': "ftp-master",
  1513. 'expected_debug_output': textwrap.dedent("""\
  1514. D: Using host "" (default_host_main)
  1515. D: Using host "ftp-master" (hardcoded)
  1516. """),
  1517. }),
  1518. ]
  1519. def test_emits_debug_message_for_discovered_host(self):
  1520. """ Should emit debug message for discovered host values. """
  1521. self.getoptions_opts['debug'] = True
  1522. dput.dcut.dcut(**self.test_args)
  1523. self.assertIn(self.expected_debug_output, sys.stdout.getvalue())
  1524. def test_calls_write_commands_with_expected_host_option(self):
  1525. """ Should call `write_commands` with expected `host` option. """
  1526. dput.dcut.dcut(**self.test_args)
  1527. self.assertEqual(1, len(dput.dcut.write_commands.mock_calls))
  1528. (__, call_args, call_kwargs) = dput.dcut.write_commands.mock_calls[0]
  1529. (__, options, __, __) = call_args
  1530. self.assertEqual(self.expected_host, options['host'])
  1531. class dcut_FileToUploadBadNameTestCase(
  1532. testscenarios.WithScenarios,
  1533. dcut_TestCase):
  1534. """ Test cases for `dcut` function, file to upload with bad name. """
  1535. scenarios = [
  1536. ('filetoupload', {
  1537. 'getoptions_args': [],
  1538. 'getoptions_opts': {
  1539. 'filetoupload': tempfile.mktemp(),
  1540. },
  1541. 'expected_output': (
  1542. "Error: I'm insisting on the .commands extension"),
  1543. }),
  1544. ]
  1545. def test_emits_error_message_for_bad_filename(self):
  1546. """ Should emit error message for bad filename. """
  1547. dput.dcut.dcut(**self.test_args)
  1548. self.assertIn(self.expected_output, sys.stdout.getvalue())
  1549. class dcut_ParseChangesTestCase(
  1550. testscenarios.WithScenarios,
  1551. dcut_TestCase):
  1552. """ Test cases for `dcut` function, parse upload control file. """
  1553. scenarios = [
  1554. ('changes-file no-filetoupload', {
  1555. 'getoptions_opts': {
  1556. 'filetoupload': None,
  1557. 'changes': tempfile.mktemp(),
  1558. },
  1559. }),
  1560. ('changes-file no-filetoupload no-filetocreate', {
  1561. 'getoptions_opts': {
  1562. 'filetoupload': None,
  1563. 'filetocreate': None,
  1564. 'changes': tempfile.mktemp(),
  1565. },
  1566. }),
  1567. ]
  1568. def test_calls_create_commands_with_expected_args(self):
  1569. """ Should call `create_commands` with expected args. """
  1570. dput.dcut.dcut(**self.test_args)
  1571. (expected_options, __) = dput.dcut.getoptions()
  1572. expected_config = self.runtime_config_parser
  1573. expected_parse_changes = dput.dput.parse_changes
  1574. dput.dcut.create_commands.assert_called_with(
  1575. expected_options, expected_config, expected_parse_changes)
  1576. def test_calls_write_commands_with_expected_args(self):
  1577. """ Should call `write_commands` with expected args. """
  1578. expected_commands = object()
  1579. dput.dcut.create_commands.return_value = expected_commands
  1580. dput.dcut.dcut(**self.test_args)
  1581. (expected_options, __) = dput.dcut.getoptions()
  1582. expected_config = self.runtime_config_parser
  1583. expected_tempdir = self.tempfile_mkdtemp_file_double.path
  1584. dput.dcut.write_commands.assert_called_with(
  1585. expected_commands, expected_options, expected_config,
  1586. expected_tempdir)
  1587. class dcut_ParseQueueCommandsTestCase(
  1588. testscenarios.WithScenarios,
  1589. dcut_TestCase):
  1590. """ Test cases for `dcut` function, parse commands from arguments. """
  1591. scenarios = [
  1592. ('no-changes-file no-filetoupload', {
  1593. 'getoptions_opts': {
  1594. 'filetoupload': None,
  1595. 'changes': None,
  1596. },
  1597. }),
  1598. ]
  1599. def test_calls_parse_queuecommands_with_expected_args(self):
  1600. """ Should call `parse_queuecommands` with expected args. """
  1601. dput.dcut.dcut(**self.test_args)
  1602. (expected_options, expected_arguments) = dput.dcut.getoptions()
  1603. expected_config = self.runtime_config_parser
  1604. dput.dcut.parse_queuecommands.assert_called_with(
  1605. expected_arguments, expected_options, expected_config)
  1606. def test_calls_write_commands_with_expected_args(self):
  1607. """ Should call `write_commands` with expected args. """
  1608. expected_commands = object()
  1609. dput.dcut.parse_queuecommands.return_value = expected_commands
  1610. dput.dcut.dcut(**self.test_args)
  1611. (expected_options, __) = dput.dcut.getoptions()
  1612. expected_config = self.runtime_config_parser
  1613. expected_tempdir = self.tempfile_mkdtemp_file_double.path
  1614. dput.dcut.write_commands.assert_called_with(
  1615. expected_commands, expected_options, expected_config,
  1616. expected_tempdir)
  1617. class dcut_CleanupTestCase(
  1618. testscenarios.WithScenarios,
  1619. dcut_TestCase):
  1620. """ Test cases for `dcut` function, cleanup from exception. """
  1621. commands_scenarios = [
  1622. ('commands-from-arguments', {
  1623. 'getoptions_args': ["foo", "bar", "baz"],
  1624. 'getoptions_opts': {
  1625. 'filetocreate': None,
  1626. 'filetoupload': None,
  1627. 'changes': None,
  1628. },
  1629. }),
  1630. ]
  1631. files_scenarios = upload_UploadMethodTestCase.files_scenarios
  1632. scenarios = testscenarios.multiply_scenarios(
  1633. commands_scenarios, files_scenarios)
  1634. def setUp(self):
  1635. """ Set up test fixtures. """
  1636. super(dcut_CleanupTestCase, self).setUp()
  1637. upload_method_func = get_upload_method_func(self)
  1638. self.upload_error = RuntimeError("Bad stuff happened")
  1639. upload_method_func.side_effect = self.upload_error
  1640. def test_removes_temporary_directory_when_upload_raises_exception(self):
  1641. """ Should remove directory `tempdir` when exception raised. """
  1642. try:
  1643. dput.dcut.dcut(**self.test_args)
  1644. except self.upload_error.__class__:
  1645. pass
  1646. shutil.rmtree.assert_called_with(
  1647. self.tempfile_mkdtemp_file_double.path)
  1648. # Local variables:
  1649. # coding: utf-8
  1650. # mode: python
  1651. # End:
  1652. # vim: fileencoding=utf-8 filetype=python :