test_methods.py 60 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558
  1. # -*- coding: utf-8; -*-
  2. #
  3. # test/test_methods.py
  4. # Part of ‘dput’, a Debian package upload toolkit.
  5. #
  6. # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
  7. #
  8. # This is free software: you may copy, modify, and/or distribute this work
  9. # under the terms of the GNU General Public License as published by the
  10. # Free Software Foundation; version 3 of that license or any later version.
  11. # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
  12. """ Unit tests for upload method behaviour. """
  13. import sys
  14. import os
  15. import os.path
  16. import subprocess
  17. import io
  18. import stat
  19. import pkgutil
  20. import importlib
  21. import collections
  22. import tempfile
  23. import textwrap
  24. import doctest
  25. import getpass
  26. import ftplib
  27. if sys.version_info >= (3, 3):
  28. import urllib.parse as urlparse
  29. elif sys.version_info >= (3, 0):
  30. raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
  31. else:
  32. import urlparse
  33. import testtools
  34. import testscenarios
  35. import httpretty
  36. __package__ = str("test")
  37. __import__(__package__)
  38. sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
  39. import dput.dput
  40. import dput.helper.dputhelper
  41. import dput.methods
  42. import dput.methods.local
  43. import dput.methods.ftp
  44. import dput.methods.http
  45. import dput.methods.https
  46. import dput.methods.scp
  47. import dput.methods.rsync
  48. from .helper import (
  49. mock,
  50. FakeSystemExit,
  51. patch_system_interfaces,
  52. EXIT_STATUS_FAILURE,
  53. FileDouble,
  54. patch_os_stat,
  55. patch_os_lstat,
  56. setup_file_double_behaviour,
  57. patch_subprocess_check_call,
  58. ARG_ANY,
  59. SubprocessDouble,
  60. )
  61. from .test_dputhelper import (
  62. patch_filewithprogress,
  63. )
  64. class import_upload_functions_TestCase(
  65. testscenarios.WithScenarios,
  66. testtools.TestCase):
  67. """ Test cases for `import_upload_functions` function. """
  68. scenarios = [
  69. ('empty', {
  70. 'module_names': [],
  71. }),
  72. ('one', {
  73. 'module_names': ["foo"],
  74. }),
  75. ('three', {
  76. 'module_names': ["foo", "bar", "baz"],
  77. }),
  78. ]
  79. for (scenario_name, scenario) in scenarios:
  80. modules_by_name = collections.OrderedDict()
  81. iter_modules_result = []
  82. for module_name in scenario['module_names']:
  83. module = mock.Mock()
  84. module.__name__ = module_name
  85. module.upload = mock.Mock()
  86. modules_by_name[module_name] = module
  87. module_entry = (module, module_name, False)
  88. iter_modules_result.append(module_entry)
  89. scenario['modules_by_name'] = modules_by_name
  90. scenario['iter_modules_result'] = iter_modules_result
  91. del scenario_name, scenario
  92. def setUp(self):
  93. """ Set up test fixtures. """
  94. super(import_upload_functions_TestCase, self).setUp()
  95. patch_system_interfaces(self)
  96. self.patch_import_functions()
  97. def patch_import_functions(self):
  98. """ Patch import functions used by the function. """
  99. self.patch_pkgutil_iter_modules()
  100. self.patch_importlib_import_module()
  101. def patch_pkgutil_iter_modules(self):
  102. """ Patch `pkgutil.iter_modules` function for this test case. """
  103. func_patcher = mock.patch.object(pkgutil, 'iter_modules')
  104. func_patcher.start()
  105. self.addCleanup(func_patcher.stop)
  106. pkgutil.iter_modules.return_value = self.iter_modules_result
  107. def patch_importlib_import_module(self):
  108. """ Patch `importlib.import_module` function for this test case. """
  109. func_patcher = mock.patch.object(importlib, 'import_module')
  110. func_patcher.start()
  111. self.addCleanup(func_patcher.stop)
  112. def fake_import_module(full_name):
  113. module_name = full_name.split(".")[-1]
  114. module = self.modules_by_name[module_name]
  115. return module
  116. importlib.import_module.side_effect = fake_import_module
  117. @mock.patch.object(dput.dput, 'debug', new=True)
  118. def test_emits_debug_message_for_modules_found(self):
  119. """ Should emit a debug message for the modules found. """
  120. expected_message = "D: modules_found: {names!r}".format(
  121. names=self.module_names)
  122. dput.dput.import_upload_functions()
  123. self.assertIn(expected_message, sys.stdout.getvalue())
  124. @mock.patch.object(dput.dput, 'debug', new=True)
  125. def test_emits_debug_message_for_each_import(self):
  126. """ Should emit a debug message for each module imported. """
  127. dput.dput.import_upload_functions()
  128. for (module_name, module) in self.modules_by_name.items():
  129. expected_message = "D: Module: {name} ({module!r})".format(
  130. name=module_name, module=module)
  131. self.assertIn(expected_message, sys.stdout.getvalue())
  132. @mock.patch.object(dput.dput, 'debug', new=True)
  133. def test_emits_debug_message_for_each_upload_method(self):
  134. """ Should emit a debug message for each upload method. """
  135. dput.dput.import_upload_functions()
  136. for module_name in self.module_names:
  137. expected_message = "D: Method name: {name}".format(
  138. name=module_name)
  139. self.assertIn(expected_message, sys.stdout.getvalue())
  140. def test_returns_expected_function_mapping(self):
  141. """ Should return expected mapping of upload functions. """
  142. result = dput.dput.import_upload_functions()
  143. expected_result = {
  144. name: self.modules_by_name[name].upload
  145. for name in self.module_names}
  146. self.assertEqual(expected_result, result)
  147. def patch_getpass_getpass(testcase):
  148. """ Patch the `getpass.getpass` function for the test case. """
  149. func_patcher = mock.patch.object(getpass, "getpass")
  150. func_patcher.start()
  151. testcase.addCleanup(func_patcher.stop)
  152. class upload_TestCase(
  153. testscenarios.WithScenarios,
  154. testtools.TestCase):
  155. """ Base for test cases for method modules `upload` functions. """
  156. files_scenarios = [
  157. ('file-list-empty', {
  158. 'paths_to_upload': [],
  159. }),
  160. ('file-list-one', {
  161. 'paths_to_upload': [tempfile.mktemp()],
  162. }),
  163. ('file-list-three', {
  164. 'paths_to_upload': [tempfile.mktemp() for __ in range(3)],
  165. }),
  166. ]
  167. check_call_scenarios = [
  168. ('check-call-success', {
  169. 'check_call_scenario_name': 'success',
  170. }),
  171. ('check-call-failure', {
  172. 'check_call_scenario_name': 'failure',
  173. 'check_call_error': subprocess.CalledProcessError,
  174. }),
  175. ]
  176. check_call_success_scenarios = [
  177. (name, params) for (name, params) in check_call_scenarios
  178. if 'check_call_error' not in params]
  179. incoming_scenarios = [
  180. ('incoming-simple', {
  181. 'incoming_path': tempfile.mktemp(),
  182. }),
  183. ('incoming-no-leading-slash', {
  184. 'incoming_path': tempfile.mktemp().lstrip("/"),
  185. }),
  186. ('incoming-has-trailing-slash', {
  187. 'incoming_path': tempfile.mktemp() + "/",
  188. }),
  189. ]
  190. def setUp(self):
  191. """ Set up test fixtures. """
  192. super(upload_TestCase, self).setUp()
  193. patch_system_interfaces(self)
  194. patch_subprocess_check_call(self)
  195. patch_os_stat(self)
  196. self.set_file_doubles()
  197. setup_file_double_behaviour(self)
  198. patch_filewithprogress(self)
  199. self.set_test_args()
  200. def set_file_doubles(self):
  201. """ Set the file doubles for this test case. """
  202. for path in self.paths_to_upload:
  203. fake_file = getattr(self, 'fake_file', None)
  204. double = FileDouble(path, fake_file)
  205. double.set_os_stat_scenario(
  206. getattr(self, 'os_stat_scenario_name', "okay"))
  207. double.register_for_testcase(self)
  208. func_patcher = mock.patch.object(double.fake_file, "close")
  209. func_patcher.start()
  210. self.addCleanup(func_patcher.stop)
  211. def set_test_args(self):
  212. """ Set the arguments for the test call to the function. """
  213. raise NotImplementedError
  214. def get_command_args_from_latest_check_call(self):
  215. """ Get command line arguments from latest `subprocess.check_call`. """
  216. latest_call = subprocess.check_call.call_args
  217. (args, kwargs) = latest_call
  218. command_args = args[0]
  219. return command_args
  220. class local_upload_TestCase(upload_TestCase):
  221. """ Test cases for `methods.local.upload` function. """
  222. scenarios = testscenarios.multiply_scenarios(
  223. upload_TestCase.incoming_scenarios,
  224. upload_TestCase.files_scenarios,
  225. upload_TestCase.check_call_success_scenarios)
  226. command_file_path = os.path.join("/usr/bin", "install")
  227. def setUp(self):
  228. """ Set up test fixtures. """
  229. super(local_upload_TestCase, self).setUp()
  230. self.set_subprocess_double()
  231. def set_test_args(self):
  232. """ Set the arguments for the test call to the function. """
  233. self.test_args = dict(
  234. fqdn=object(),
  235. login=object(),
  236. incoming=self.incoming_path,
  237. files_to_upload=self.paths_to_upload,
  238. debug=None,
  239. compress=object(),
  240. progress=object(),
  241. )
  242. def set_subprocess_double(self):
  243. """ Set the test double for the subprocess. """
  244. argv = [self.command_file_path, "-m", ARG_ANY, ARG_ANY]
  245. double = SubprocessDouble(self.command_file_path, argv=argv)
  246. double.register_for_testcase(self)
  247. double.set_subprocess_check_call_scenario(
  248. self.check_call_scenario_name)
  249. self.subprocess_double = double
  250. def test_calls_check_call_with_install_command(self):
  251. """ Should call `subprocess.check_call` to invoke ‘install’. """
  252. dput.methods.local.upload(**self.test_args)
  253. command_args = self.get_command_args_from_latest_check_call()
  254. expected_command = self.command_file_path
  255. self.assertEqual(expected_command, command_args[0])
  256. def test_calls_check_call_with_mode_option_in_command(self):
  257. """ Should call `subprocess.check_call`, invoke command with mode. """
  258. dput.methods.local.upload(**self.test_args)
  259. command_args = self.get_command_args_from_latest_check_call()
  260. expected_mode = (
  261. stat.S_IRUSR | stat.S_IWUSR
  262. | stat.S_IRGRP
  263. | stat.S_IROTH)
  264. expected_mode_text = "{mode:04o}".format(mode=expected_mode)[-3:]
  265. expected_option_args = ["-m", expected_mode_text]
  266. self.assertEqual(expected_option_args, command_args[1:3])
  267. def test_calls_check_call_with_file_paths_in_command(self):
  268. """ Should call `subprocess.check_call` with file paths in command. """
  269. dput.methods.local.upload(**self.test_args)
  270. command_args = self.get_command_args_from_latest_check_call()
  271. self.assertEqual(self.paths_to_upload, command_args[3:-1])
  272. def test_calls_check_call_with_incoming_path_as_final_arg(self):
  273. """ Should call `subprocess.check_call` with incoming path. """
  274. dput.methods.local.upload(**self.test_args)
  275. command_args = self.get_command_args_from_latest_check_call()
  276. self.assertEqual(self.incoming_path, command_args[-1])
  277. def test_emits_debug_message_for_upload_command(self):
  278. """ Should emit a debug message for the upload command. """
  279. self.test_args['debug'] = True
  280. dput.methods.local.upload(**self.test_args)
  281. expected_message = textwrap.dedent("""\
  282. D: Uploading with cp to {dir_path}
  283. D: ...
  284. """).format(dir_path=self.incoming_path)
  285. self.assertThat(
  286. sys.stdout.getvalue(),
  287. testtools.matchers.DocTestMatches(
  288. expected_message, flags=doctest.ELLIPSIS))
  289. def test_calls_sys_exit_if_check_call_returns_nonzero(self):
  290. """ Should call `sys.exit` if `subprocess.check_call` fails. """
  291. self.subprocess_double.set_subprocess_check_call_scenario('failure')
  292. with testtools.ExpectedException(FakeSystemExit):
  293. dput.methods.local.upload(**self.test_args)
  294. expected_output = textwrap.dedent("""\
  295. Error while uploading.
  296. """)
  297. self.assertIn(expected_output, sys.stdout.getvalue())
  298. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  299. class ftp_upload_TestCase(upload_TestCase):
  300. """ Test cases for `methods.ftp.upload` function. """
  301. scenarios = NotImplemented
  302. def setUp(self):
  303. """ Set up test fixtures. """
  304. super(ftp_upload_TestCase, self).setUp()
  305. self.set_ftp_client()
  306. self.patch_ftplib_ftp()
  307. patch_getpass_getpass(self)
  308. self.fake_password = self.getUniqueString()
  309. getpass.getpass.return_value = self.fake_password
  310. if not hasattr(self, 'expected_password'):
  311. self.expected_password = self.fake_password
  312. def set_test_args(self):
  313. """ Set the arguments for the test call to the function. """
  314. if not hasattr(self, 'progress_type'):
  315. self.progress_type = 0
  316. self.test_args = dict(
  317. fqdn=self.getUniqueString(),
  318. login=self.login,
  319. incoming=self.incoming_path,
  320. files_to_upload=self.paths_to_upload,
  321. debug=False,
  322. ftp_mode=self.ftp_mode,
  323. progress=self.progress_type,
  324. port=object(),
  325. )
  326. def set_ftp_client(self):
  327. """ Set the FTP client double. """
  328. self.ftp_client = mock.MagicMock(name="FTP")
  329. def patch_ftplib_ftp(self):
  330. """ Patch `ftplib.FTP` class for this test case. """
  331. patcher = mock.patch.object(ftplib, "FTP")
  332. patcher.start()
  333. self.addCleanup(patcher.stop)
  334. ftplib.FTP.return_value = self.ftp_client
  335. class ftp_upload_NormalFilesTestCase(ftp_upload_TestCase):
  336. """ Test cases for `methods.ftp.upload` function, upload normal files. """
  337. login_scenarios = [
  338. ('anonymous', {
  339. 'login': "anonymous",
  340. 'expected_password': "dput@packages.debian.org",
  341. }),
  342. ('username', {
  343. 'login': "lorem",
  344. }),
  345. ]
  346. ftp_client_scenarios = [
  347. ('default', {
  348. 'ftp_mode': False,
  349. }),
  350. ('passive-mode', {
  351. 'ftp_mode': True,
  352. }),
  353. ]
  354. scenarios = testscenarios.multiply_scenarios(
  355. upload_TestCase.incoming_scenarios,
  356. upload_TestCase.files_scenarios,
  357. login_scenarios, ftp_client_scenarios)
  358. def test_emits_debug_message_for_connect(self):
  359. """ Should emit debug message for successful connect. """
  360. self.test_args['debug'] = True
  361. dput.methods.ftp.upload(**self.test_args)
  362. expected_fqdn = self.test_args['fqdn']
  363. expected_output = textwrap.dedent("""\
  364. D: FTP-Connection to host: {fqdn}
  365. """).format(fqdn=expected_fqdn)
  366. self.assertIn(expected_output, sys.stdout.getvalue())
  367. def test_calls_ftp_connect_with_expected_args(self):
  368. """ Should call `FTP.connect` with expected args. """
  369. dput.methods.ftp.upload(**self.test_args)
  370. expected_args = (
  371. self.test_args['fqdn'],
  372. self.test_args['port'],
  373. )
  374. self.ftp_client.connect.assert_called_with(*expected_args)
  375. def test_emits_error_message_when_ftp_connect_error(self):
  376. """ Should emit error message when `FTP.connect` raises error. """
  377. self.ftp_client.connect.side_effect = ftplib.error_temp
  378. try:
  379. dput.methods.ftp.upload(**self.test_args)
  380. except FakeSystemExit:
  381. pass
  382. expected_output = "Connection failed, aborting"
  383. self.assertIn(expected_output, sys.stdout.getvalue())
  384. def test_calls_sys_exit_when_ftp_connect_permission_error(self):
  385. """ Should call `sys.exit` when `FTP.connect` raises error. """
  386. self.ftp_client.connect.side_effect = ftplib.error_temp
  387. with testtools.ExpectedException(FakeSystemExit):
  388. dput.methods.ftp.upload(**self.test_args)
  389. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  390. def test_calls_ftp_login_with_expected_args(self):
  391. """ Should call `FTP.login` with expected args. """
  392. dput.methods.ftp.upload(**self.test_args)
  393. expected_args = (
  394. self.test_args['login'],
  395. self.expected_password,
  396. )
  397. self.ftp_client.login.assert_called_with(*expected_args)
  398. def test_emits_error_message_when_ftp_login_permission_error(self):
  399. """ Should emit error message when `FTP.login` permission error. """
  400. self.ftp_client.login.side_effect = ftplib.error_perm
  401. try:
  402. dput.methods.ftp.upload(**self.test_args)
  403. except FakeSystemExit:
  404. pass
  405. expected_output = "Wrong Password"
  406. self.assertIn(expected_output, sys.stdout.getvalue())
  407. def test_calls_sys_exit_when_ftp_login_permission_error(self):
  408. """ Should call `sys.exit` when `FTP.login` permission error. """
  409. self.ftp_client.login.side_effect = ftplib.error_perm
  410. with testtools.ExpectedException(FakeSystemExit):
  411. dput.methods.ftp.upload(**self.test_args)
  412. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  413. def test_emits_error_message_when_ftp_login_eof_error(self):
  414. """ Should emit error message when `FTP.login` EOF error. """
  415. self.ftp_client.login.side_effect = EOFError
  416. try:
  417. dput.methods.ftp.upload(**self.test_args)
  418. except FakeSystemExit:
  419. pass
  420. expected_output = "Server closed the connection"
  421. self.assertIn(expected_output, sys.stdout.getvalue())
  422. def test_calls_sys_exit_when_ftp_login_eof_error(self):
  423. """ Should call `sys.exit` when `FTP.login` EOF error. """
  424. self.ftp_client.login.side_effect = EOFError
  425. with testtools.ExpectedException(FakeSystemExit):
  426. dput.methods.ftp.upload(**self.test_args)
  427. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  428. def test_calls_ftp_set_pasv_with_expected_args(self):
  429. """ Should call `FTP.set_pasv` with expected args. """
  430. dput.methods.ftp.upload(**self.test_args)
  431. expected_mode = bool(self.test_args['ftp_mode'])
  432. expected_args = (expected_mode,)
  433. self.ftp_client.set_pasv.assert_called_with(*expected_args)
  434. def test_calls_ftp_cwd_with_expected_args(self):
  435. """ Should call `FTP.cwd` with expected args. """
  436. dput.methods.ftp.upload(**self.test_args)
  437. expected_path = self.incoming_path
  438. expected_args = (expected_path,)
  439. self.ftp_client.cwd.assert_called_with(*expected_args)
  440. def test_emits_debug_message_for_cwd(self):
  441. """ Should emit debug message for successful `FTP.cwd`. """
  442. self.test_args['debug'] = True
  443. dput.methods.ftp.upload(**self.test_args)
  444. expected_output = textwrap.dedent("""\
  445. D: Directory to upload to: {path}
  446. """).format(path=self.incoming_path)
  447. self.assertIn(expected_output, sys.stdout.getvalue())
  448. def test_emits_error_message_when_destination_directory_not_found(self):
  449. """ Should emit error message when destination directory not found. """
  450. error = ftplib.error_perm("550 Not Found")
  451. self.ftp_client.cwd.side_effect = error
  452. try:
  453. dput.methods.ftp.upload(**self.test_args)
  454. except FakeSystemExit:
  455. pass
  456. expected_output = "Directory to upload to does not exist."
  457. self.assertIn(expected_output, sys.stdout.getvalue())
  458. def test_calls_sys_exit_when_ftp_cwd_permission_error(self):
  459. """ Should call `sys.exit` when `FTP.cwd` permission error. """
  460. error = ftplib.error_perm("550 Not Found")
  461. self.ftp_client.cwd.side_effect = error
  462. with testtools.ExpectedException(FakeSystemExit):
  463. dput.methods.ftp.upload(**self.test_args)
  464. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  465. def test_propagates_exception_when_ftp_cwd_permission_error(self):
  466. """ Should call `sys.exit` when `FTP.cwd` permission error. """
  467. error = ftplib.error_perm("500 Bad Stuff Happened")
  468. self.ftp_client.cwd.side_effect = error
  469. with testtools.ExpectedException(error.__class__):
  470. dput.methods.ftp.upload(**self.test_args)
  471. def test_propagates_exception_when_ftp_cwd_eof_error(self):
  472. """ Should call `sys.exit` when `FTP.cwd` EOF error. """
  473. error = EOFError()
  474. self.ftp_client.cwd.side_effect = error
  475. with testtools.ExpectedException(error.__class__):
  476. dput.methods.ftp.upload(**self.test_args)
  477. def test_emits_debug_message_for_each_file(self):
  478. """ Should emit debug message for each file to upload. """
  479. self.test_args['debug'] = True
  480. dput.methods.ftp.upload(**self.test_args)
  481. expected_output = "".join(textwrap.dedent("""\
  482. D: Uploading File: {path}
  483. Uploading {filename}: done.
  484. """).format(path=path, filename=os.path.basename(path))
  485. for path in self.paths_to_upload)
  486. self.assertIn(expected_output, sys.stdout.getvalue())
  487. def test_calls_ftp_storbinary_for_each_file(self):
  488. """ Should call `FTP.storbinary` for each file to upload. """
  489. dput.methods.ftp.upload(**self.test_args)
  490. registry = FileDouble.get_registry_for_testcase(self)
  491. expected_blocksize = 1024
  492. expected_calls = [
  493. mock.call(
  494. "STOR {filename}".format(filename=os.path.basename(path)),
  495. registry[path].fake_file, expected_blocksize)
  496. for path in self.paths_to_upload]
  497. self.ftp_client.storbinary.assert_has_calls(
  498. expected_calls, any_order=True)
  499. def test_calls_close_for_each_file(self):
  500. """ Should call `file.close` for each file to upload. """
  501. dput.methods.ftp.upload(**self.test_args)
  502. registry = FileDouble.get_registry_for_testcase(self)
  503. for path in self.paths_to_upload:
  504. fake_file = registry[path].fake_file
  505. fake_file.close.assert_called_with()
  506. class ftp_upload_ErrorTestCase(ftp_upload_TestCase):
  507. """ Test cases for `methods.ftp.upload` function, error conditions. """
  508. login_scenarios = [
  509. ('anonymous', {
  510. 'login': "anonymous",
  511. 'expected_password': "dput@packages.debian.org",
  512. }),
  513. ]
  514. ftp_client_scenarios = [
  515. ('default', {
  516. 'ftp_mode': False,
  517. }),
  518. ]
  519. progress_scenarios = [
  520. ('progress-type-0', {
  521. 'progress_type': 0,
  522. }),
  523. ('progress-type-1', {
  524. 'progress_type': 1,
  525. }),
  526. ('progress-type-2', {
  527. 'progress_type': 2,
  528. }),
  529. ]
  530. files_scenarios = list(
  531. (scenario_name, scenario) for (scenario_name, scenario)
  532. in upload_TestCase.files_scenarios
  533. if scenario['paths_to_upload'])
  534. scenarios = testscenarios.multiply_scenarios(
  535. upload_TestCase.incoming_scenarios,
  536. files_scenarios,
  537. login_scenarios, ftp_client_scenarios, progress_scenarios)
  538. def test_emits_warning_when_remote_file_exists(self):
  539. """ Should emit a warning message when remote file exists. """
  540. error = ftplib.error_perm("553 Exists")
  541. self.ftp_client.storbinary.side_effect = error
  542. dput.methods.ftp.upload(**self.test_args)
  543. for path in self.paths_to_upload:
  544. expected_output = textwrap.dedent("""\
  545. Leaving existing {path} on the server and continuing
  546. """).format(path=os.path.basename(path))
  547. self.expectThat(
  548. sys.stdout.getvalue(),
  549. testtools.matchers.Contains(expected_output))
  550. def test_omits_sys_exit_when_remote_file_exists(self):
  551. """ Should omit call to `sys.exit` when remote file exists. """
  552. error = ftplib.error_perm("553 Exists")
  553. self.ftp_client.storbinary.side_effect = error
  554. dput.methods.ftp.upload(**self.test_args)
  555. self.assertFalse(sys.exit.called)
  556. def test_emits_error_message_when_storbinary_failure(self):
  557. """ Should emit an error message when `FTP.storbinary` failure. """
  558. error = ftplib.error_perm("504 Weird Stuff Happened")
  559. self.ftp_client.storbinary.side_effect = error
  560. try:
  561. dput.methods.ftp.upload(**self.test_args)
  562. except FakeSystemExit:
  563. pass
  564. for path in self.paths_to_upload[:1]:
  565. expected_output = (
  566. "Note: This error might indicate a problem with"
  567. " your passive_ftp setting.\n")
  568. self.expectThat(
  569. sys.stdout.getvalue(),
  570. testtools.matchers.Contains(expected_output))
  571. def test_calls_sys_exit_when_storbinary_failure(self):
  572. """ Should call `sys.exit` when `FTP.storbinary` failure. """
  573. error = ftplib.error_perm("504 Weird Stuff Happened")
  574. self.ftp_client.storbinary.side_effect = error
  575. with testtools.ExpectedException(FakeSystemExit):
  576. dput.methods.ftp.upload(**self.test_args)
  577. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  578. def test_emits_debug_message_when_open_failure(self):
  579. """ Should emit a debug message when `builtins.open` failure. """
  580. self.test_args['debug'] = True
  581. registry = FileDouble.get_registry_for_testcase(self)
  582. for path in self.paths_to_upload:
  583. double = registry[path]
  584. double.set_open_scenario('nonexist')
  585. try:
  586. dput.methods.ftp.upload(**self.test_args)
  587. except EnvironmentError:
  588. pass
  589. expected_output = (
  590. "D: Should exit silently now, but"
  591. " will throw exception for debug.")
  592. self.assertIn(expected_output, sys.stdout.getvalue())
  593. def test_propagates_error_from_storbinary_for_debug(self):
  594. """ Should propagate error from `FTP.storbinary` when debug. """
  595. self.test_args['debug'] = True
  596. error = ftplib.error_perm("504 Weird Stuff Happened")
  597. self.ftp_client.storbinary.side_effect = error
  598. with testtools.ExpectedException(error.__class__):
  599. dput.methods.ftp.upload(**self.test_args)
  600. def test_propagates_error_from_quit_for_debug(self):
  601. """ Should propagate error from `FTP.quit` when debug. """
  602. self.test_args['debug'] = True
  603. error = ftplib.error_perm("504 Weird Stuff Happened")
  604. self.ftp_client.quit.side_effect = error
  605. with testtools.ExpectedException(error.__class__):
  606. dput.methods.ftp.upload(**self.test_args)
  607. def make_expected_filewithprogress_attributes_by_path(testcase, attrs):
  608. """ Make a mapping from path to expected FileWithProgress attribs. """
  609. expected_attributes_by_path = {}
  610. registry = FileDouble.get_registry_for_testcase(testcase)
  611. for path in testcase.paths_to_upload:
  612. file_double = registry[path]
  613. expected_attributes = {
  614. 'f': file_double.fake_file,
  615. 'size': file_double.stat_result.st_size,
  616. }
  617. expected_attributes.update(attrs)
  618. expected_attributes_by_path[path] = expected_attributes
  619. return expected_attributes_by_path
  620. class ftp_upload_ProgressTestCase(ftp_upload_TestCase):
  621. """ Test cases for `methods.ftp.upload` function, with progress meter. """
  622. login_scenarios = [
  623. ('anonymous', {
  624. 'login': "anonymous",
  625. 'expected_password': "dput@packages.debian.org",
  626. }),
  627. ]
  628. ftp_client_scenarios = [
  629. ('default', {
  630. 'ftp_mode': False,
  631. }),
  632. ]
  633. progress_scenarios = [
  634. ('progress-type-1', {
  635. 'progress_type': 1,
  636. }),
  637. ('progress-type-2', {
  638. 'progress_type': 2,
  639. }),
  640. ]
  641. scenarios = testscenarios.multiply_scenarios(
  642. upload_TestCase.incoming_scenarios,
  643. upload_TestCase.files_scenarios,
  644. login_scenarios, ftp_client_scenarios, progress_scenarios)
  645. def test_calls_storbinary_with_filewithprogress(self):
  646. """ Should use a `FileWithProgress` to call `FTP.storbinary`. """
  647. dput.methods.ftp.upload(**self.test_args)
  648. expected_calls = [
  649. mock.call(
  650. mock.ANY, self.fake_filewithprogress,
  651. mock.ANY)
  652. for path in self.paths_to_upload]
  653. self.ftp_client.storbinary.assert_has_calls(
  654. expected_calls, any_order=True)
  655. def test_filewithprogress_has_expected_attributes(self):
  656. """ Should have expected attributes on the `FileWithProgress`. """
  657. expected_attributes_by_path = (
  658. make_expected_filewithprogress_attributes_by_path(
  659. self, {'ptype': self.progress_type}))
  660. dput.methods.ftp.upload(**self.test_args)
  661. for call in self.ftp_client.storbinary.mock_calls:
  662. (__, call_args, call_kwargs) = call
  663. (__, stor_file, __) = call_args
  664. path = stor_file.f.name
  665. expected_attributes = expected_attributes_by_path[path]
  666. stor_file_attributes = {
  667. name: getattr(stor_file, name)
  668. for name in expected_attributes}
  669. self.expectThat(
  670. expected_attributes,
  671. testtools.matchers.Equals(stor_file_attributes))
  672. def test_filewithprogress_has_sentinel_size_when_stat_failure(self):
  673. """ Should have sentinel `size` value when `os.stat` failure. """
  674. expected_attributes_by_path = (
  675. make_expected_filewithprogress_attributes_by_path(
  676. self, {'size': -1}))
  677. registry = FileDouble.get_registry_for_testcase(self)
  678. for path in self.paths_to_upload:
  679. double = registry[path]
  680. double.set_os_stat_scenario('notfound_error')
  681. dput.methods.ftp.upload(**self.test_args)
  682. for call in self.ftp_client.storbinary.mock_calls:
  683. (__, call_args, call_kwargs) = call
  684. (__, stor_file, __) = call_args
  685. path = stor_file.f.name
  686. expected_attributes = expected_attributes_by_path[path]
  687. stor_file_attributes = {
  688. name: getattr(stor_file, name)
  689. for name in expected_attributes}
  690. self.expectThat(
  691. expected_attributes,
  692. testtools.matchers.Equals(stor_file_attributes))
  693. def test_emits_debug_message_when_stat_failure(self):
  694. """ Should have sentinel `size` value when `os.stat` failure. """
  695. self.test_args['debug'] = True
  696. registry = FileDouble.get_registry_for_testcase(self)
  697. for path in self.paths_to_upload:
  698. double = registry[path]
  699. double.set_os_stat_scenario('notfound_error')
  700. dput.methods.ftp.upload(**self.test_args)
  701. for path in self.paths_to_upload:
  702. expected_output = textwrap.dedent("""\
  703. D: Determining size of file '{path}' failed
  704. """).format(path=path)
  705. self.expectThat(
  706. sys.stdout.getvalue(),
  707. testtools.matchers.Contains(expected_output))
  708. class http_upload_TestCase(upload_TestCase):
  709. """ Base for test cases for `methods.http.upload` function. """
  710. scenarios = NotImplemented
  711. protocol_scenarios = [
  712. ('http', {
  713. 'function_to_test': dput.methods.http.upload,
  714. 'protocol': "http",
  715. 'protocol_version': "HTTP/1.0",
  716. }),
  717. ('https', {
  718. 'function_to_test': dput.methods.https.upload,
  719. 'protocol': "https",
  720. 'protocol_version': "HTTP/1.0",
  721. }),
  722. ]
  723. login_scenarios = [
  724. ('username', {
  725. 'login': "lorem",
  726. }),
  727. ]
  728. def setUp(self):
  729. """ Set up test fixtures. """
  730. super(http_upload_TestCase, self).setUp()
  731. httpretty.enable()
  732. self.addCleanup(httpretty.disable)
  733. self.set_response_header_fields()
  734. self.patch_put_requests()
  735. patch_getpass_getpass(self)
  736. self.fake_password = self.getUniqueString()
  737. getpass.getpass.return_value = self.fake_password
  738. if not hasattr(self, 'expected_password'):
  739. self.expected_password = self.fake_password
  740. def set_test_args(self):
  741. """ Set the arguments for the test call to the function. """
  742. if not hasattr(self, 'progress_type'):
  743. self.progress_type = 0
  744. self.test_args = dict(
  745. fqdn=self.getUniqueString(),
  746. login=self.login,
  747. incoming=self.incoming_path,
  748. files_to_upload=self.paths_to_upload,
  749. debug=False,
  750. dummy=object(),
  751. progress=self.progress_type,
  752. )
  753. if self.function_to_test is dput.methods.http.upload:
  754. self.test_args['protocol'] = self.protocol
  755. def make_upload_uri(self, file_name):
  756. """ Make the URI for a file for upload. """
  757. uri = urlparse.urlunsplit([
  758. self.protocol, self.test_args['fqdn'],
  759. os.path.join(os.path.sep, self.incoming_path, file_name),
  760. None, None])
  761. return uri
  762. def set_response_header_fields(self):
  763. """ Set the header fields for the HTTP response. """
  764. if not hasattr(self, 'response_header_fields'):
  765. self.response_header_fields = {}
  766. def patch_put_requests(self):
  767. """ Patch the HTTP PUT requests. """
  768. self.path_by_request_uri = {}
  769. for path in self.paths_to_upload:
  770. upload_uri = self.make_upload_uri(os.path.basename(path))
  771. self.path_by_request_uri[upload_uri] = path
  772. response_body = ""
  773. httpretty.register_uri(
  774. httpretty.PUT, upload_uri,
  775. status=self.status_code, body=response_body)
  776. class http_upload_SuccessTestCase(http_upload_TestCase):
  777. """ Success test cases for `methods.http.upload` function. """
  778. response_scenarios = [
  779. ('okay', {
  780. 'status_code': 200,
  781. 'status_reason': "Okay",
  782. }),
  783. ('chatter', {
  784. 'status_code': 203,
  785. 'status_reason': "Non-Authoritative Information",
  786. }),
  787. ]
  788. auth_scenarios = [
  789. ('auth-accepted', {
  790. 'auth_response_status_code': 200,
  791. 'auth_response_status_reason': "Okay",
  792. }),
  793. ]
  794. size_scenarios = [
  795. ('size-empty', {
  796. 'fake_file': io.BytesIO(),
  797. }),
  798. ('size-1k', {
  799. 'fake_file': io.BytesIO(
  800. b"Lorem ipsum, dolor sit amet.___\n" * 32),
  801. }),
  802. ('size-100k', {
  803. 'fake_file': io.BytesIO(
  804. b"Lorem ipsum, dolor sit amet.___\n" * 3200),
  805. }),
  806. ]
  807. incoming_scenarios = list(upload_TestCase.incoming_scenarios)
  808. for (scenario_name, scenario) in incoming_scenarios:
  809. scenario['expected_url_path_prefix'] = os.path.join(
  810. os.path.sep, scenario['incoming_path'])
  811. del scenario_name, scenario
  812. scenarios = testscenarios.multiply_scenarios(
  813. upload_TestCase.files_scenarios,
  814. size_scenarios,
  815. upload_TestCase.incoming_scenarios,
  816. http_upload_TestCase.protocol_scenarios,
  817. http_upload_TestCase.login_scenarios,
  818. response_scenarios, auth_scenarios)
  819. def test_emits_debug_message_for_upload(self):
  820. """ Should emit debug message for upload. """
  821. self.test_args['debug'] = True
  822. self.function_to_test(**self.test_args)
  823. for path in self.paths_to_upload:
  824. expected_uri = self.make_upload_uri(os.path.basename(path))
  825. expected_output = textwrap.dedent("""\
  826. D: HTTP-PUT to URL: {uri}
  827. """).format(uri=expected_uri)
  828. self.expectThat(
  829. sys.stdout.getvalue(),
  830. testtools.matchers.Contains(expected_output))
  831. def test_request_has_expected_fields(self):
  832. """ Should send request with expected fields in header. """
  833. if not self.paths_to_upload:
  834. self.skipTest("No files to upload")
  835. self.function_to_test(**self.test_args)
  836. registry = FileDouble.get_registry_for_testcase(self)
  837. path = self.paths_to_upload[-1]
  838. double = registry[path]
  839. request = httpretty.last_request()
  840. expected_fields = {
  841. 'User-Agent': "dput",
  842. 'Connection': "close",
  843. 'Content-Length': "{size:d}".format(
  844. size=len(double.fake_file.getvalue())),
  845. }
  846. for (name, value) in expected_fields.items():
  847. self.expectThat(
  848. request.headers.get(name),
  849. testtools.matchers.Equals(value))
  850. class http_upload_ProgressTestCase(http_upload_TestCase):
  851. """ Test cases for `methods.http.upload` function, with progress meter. """
  852. files_scenarios = list(
  853. (scenario_name, scenario) for (scenario_name, scenario)
  854. in upload_TestCase.files_scenarios
  855. if scenario['paths_to_upload'])
  856. response_scenarios = [
  857. ('okay', {
  858. 'status_code': 200,
  859. 'status_reason': "Okay",
  860. }),
  861. ]
  862. progress_scenarios = [
  863. ('progress-type-1', {
  864. 'progress_type': 1,
  865. }),
  866. ('progress-type-2', {
  867. 'progress_type': 2,
  868. }),
  869. ]
  870. scenarios = testscenarios.multiply_scenarios(
  871. files_scenarios,
  872. progress_scenarios,
  873. upload_TestCase.incoming_scenarios,
  874. http_upload_TestCase.protocol_scenarios,
  875. http_upload_TestCase.login_scenarios,
  876. http_upload_SuccessTestCase.auth_scenarios,
  877. response_scenarios)
  878. def test_filewithprogress_has_expected_attributes(self):
  879. """ Should have expected attributes on the `FileWithProgress`. """
  880. expected_attributes_by_path = (
  881. make_expected_filewithprogress_attributes_by_path(
  882. self, {'ptype': self.progress_type}))
  883. self.function_to_test(**self.test_args)
  884. path = self.paths_to_upload[-1]
  885. expected_attributes = expected_attributes_by_path[path]
  886. fake_file_attributes = {
  887. name: getattr(self.fake_filewithprogress, name)
  888. for name in expected_attributes}
  889. self.expectThat(
  890. expected_attributes,
  891. testtools.matchers.Equals(fake_file_attributes))
  892. class http_upload_UnknownProtocolTestCase(http_upload_TestCase):
  893. """ Test cases for `methods.http.upload` function, unknown protocol. """
  894. files_scenarios = list(
  895. (scenario_name, scenario) for (scenario_name, scenario)
  896. in upload_TestCase.files_scenarios
  897. if scenario['paths_to_upload'])
  898. protocol_scenarios = [
  899. ('protocol-bogus', {
  900. 'function_to_test': dput.methods.http.upload,
  901. 'protocol': "b0gUs",
  902. 'protocol_version': "b0gUs",
  903. 'expected_exit_status': EXIT_STATUS_FAILURE,
  904. }),
  905. ]
  906. response_scenarios = [
  907. (scenario_name, scenario) for (scenario_name, scenario)
  908. in http_upload_SuccessTestCase.response_scenarios
  909. if scenario['status_code'] == 200]
  910. scenarios = testscenarios.multiply_scenarios(
  911. files_scenarios,
  912. upload_TestCase.incoming_scenarios,
  913. protocol_scenarios,
  914. http_upload_TestCase.login_scenarios,
  915. response_scenarios,
  916. http_upload_SuccessTestCase.auth_scenarios)
  917. def test_emits_error_message_when_unknown_protocol(self):
  918. """ Should emit error message when unknown protocol. """
  919. try:
  920. self.function_to_test(**self.test_args)
  921. except FakeSystemExit:
  922. pass
  923. expected_output = "Wrong protocol for upload "
  924. self.assertIn(expected_output, sys.stderr.getvalue())
  925. def test_calls_sys_exit_when_unknown_protocol(self):
  926. """ Should call `sys.exit` when unknown protocol. """
  927. with testtools.ExpectedException(FakeSystemExit):
  928. self.function_to_test(**self.test_args)
  929. sys.exit.assert_called_with(self.expected_exit_status)
  930. class http_upload_FileStatFailureTestCase(http_upload_TestCase):
  931. """ Test cases for `methods.http.upload` function, `os.stat` failure. """
  932. files_scenarios = list(
  933. (scenario_name, scenario) for (scenario_name, scenario)
  934. in upload_TestCase.files_scenarios
  935. if scenario['paths_to_upload'])
  936. os_stat_scenarios = [
  937. ('os-stat-notfound', {
  938. 'os_stat_scenario_name': "notfound_error",
  939. 'expected_exit_status': EXIT_STATUS_FAILURE,
  940. }),
  941. ('os-stat-denied', {
  942. 'os_stat_scenario_name': "denied_error",
  943. 'expected_exit_status': EXIT_STATUS_FAILURE,
  944. }),
  945. ]
  946. response_scenarios = list(
  947. (scenario_name, scenario) for (scenario_name, scenario)
  948. in http_upload_SuccessTestCase.response_scenarios
  949. if scenario['status_code'] == 200)
  950. scenarios = testscenarios.multiply_scenarios(
  951. files_scenarios,
  952. os_stat_scenarios,
  953. upload_TestCase.incoming_scenarios,
  954. http_upload_TestCase.protocol_scenarios,
  955. http_upload_TestCase.login_scenarios,
  956. response_scenarios,
  957. http_upload_SuccessTestCase.auth_scenarios)
  958. def test_emits_error_message(self):
  959. """ Should emit error message when `os.stat` failure. """
  960. try:
  961. self.function_to_test(**self.test_args)
  962. except FakeSystemExit:
  963. pass
  964. expected_output = textwrap.dedent("""\
  965. Determining size of file '{path}' failed
  966. """).format(path=self.paths_to_upload[0])
  967. self.assertIn(expected_output, sys.stderr.getvalue())
  968. def test_calls_sys_exit_with_expected_exit_status(self):
  969. """ Should call `sys.exit` with expected exit status. """
  970. with testtools.ExpectedException(FakeSystemExit):
  971. self.function_to_test(**self.test_args)
  972. sys.exit.assert_called_with(self.expected_exit_status)
  973. class http_upload_ResponseErrorTestCase(http_upload_TestCase):
  974. """ Error test cases for `methods.http.upload` function. """
  975. files_scenarios = list(
  976. (scenario_name, scenario) for (scenario_name, scenario)
  977. in upload_TestCase.files_scenarios
  978. if scenario['paths_to_upload'])
  979. response_scenarios = [
  980. ('server-error', {
  981. 'status_code': 500,
  982. 'status_reason': "Internal Server Error",
  983. 'auth_response_status_code': 200,
  984. 'auth_response_status_reason': "Okay",
  985. 'expected_exit_status': EXIT_STATUS_FAILURE,
  986. }),
  987. ]
  988. scenarios = testscenarios.multiply_scenarios(
  989. files_scenarios,
  990. upload_TestCase.incoming_scenarios,
  991. http_upload_TestCase.protocol_scenarios,
  992. http_upload_TestCase.login_scenarios,
  993. response_scenarios)
  994. def test_emits_error_message_when_response_status_error(self):
  995. """ Should emit debug message when response status is error. """
  996. try:
  997. self.function_to_test(**self.test_args)
  998. except FakeSystemExit:
  999. pass
  1000. expected_output = textwrap.dedent("""\
  1001. Upload failed: {status} {reason}
  1002. """).format(status=self.status_code, reason=self.status_reason)
  1003. self.assertIn(expected_output, sys.stdout.getvalue())
  1004. def test_calls_sys_exit_when_response_status_error(self):
  1005. """ Should call `sys.exit` when response status is error. """
  1006. with testtools.ExpectedException(FakeSystemExit):
  1007. self.function_to_test(**self.test_args)
  1008. sys.exit.assert_called_with(self.expected_exit_status)
  1009. def make_host_spec(username, host):
  1010. """ Make an SSH host specification. """
  1011. host_spec = host
  1012. if username != "*":
  1013. host_spec = "{user}@{fqdn}".format(user=username, fqdn=host)
  1014. return host_spec
  1015. def make_remote_spec(username, host, dir_path):
  1016. """ Make an SCP remote specification. """
  1017. host_spec = make_host_spec(username, host)
  1018. remote_spec = "{host}:{dir}".format(host=host_spec, dir=dir_path)
  1019. return remote_spec
  1020. class ssh_channel_upload_TestCase(upload_TestCase):
  1021. """ Base for test cases for upload over SSH channel. """
  1022. function_to_test = NotImplemented
  1023. scenarios = NotImplemented
  1024. login_scenarios = [
  1025. ('login-username', {
  1026. 'login': "lorem",
  1027. }),
  1028. ('login-wildcard', {
  1029. 'login': "*",
  1030. }),
  1031. ]
  1032. stat_mode_scenarios = [
  1033. ('stat-mode-default', {}),
  1034. ('stat-mode-0620', {
  1035. 'stat_mode': 0o0620,
  1036. 'expected_ssh_chmod': True,
  1037. }),
  1038. ]
  1039. def set_upload_file_modes(self):
  1040. """ Set filesystem modes for upload files. """
  1041. registry = FileDouble.get_registry_for_testcase(self)
  1042. if hasattr(self, 'stat_mode'):
  1043. for path in self.paths_to_upload:
  1044. file_double = registry[path]
  1045. file_double.stat_result = file_double.stat_result._replace(
  1046. st_mode=self.stat_mode)
  1047. def set_ssh_chmod_subprocess_double(self):
  1048. """ Set the ‘ssh … chmod’ test double for the subprocess. """
  1049. command_file_path = "/usr/bin/ssh"
  1050. argv = [os.path.basename(command_file_path)]
  1051. argv.extend(self.expected_ssh_options)
  1052. argv.append(make_host_spec(
  1053. username=self.login, host=self.test_args['fqdn']))
  1054. argv.extend(["chmod", "0644"])
  1055. argv.extend(
  1056. os.path.join(self.incoming_path, os.path.basename(path))
  1057. for path in self.paths_to_upload)
  1058. double = SubprocessDouble(command_file_path, argv=argv)
  1059. double.register_for_testcase(self)
  1060. check_call_scenario_name = getattr(
  1061. self, 'ssh_chmod_check_call_scenario_name', "success")
  1062. double.set_subprocess_check_call_scenario(check_call_scenario_name)
  1063. self.ssh_chmod_subprocess_double = double
  1064. class scp_upload_TestCase(ssh_channel_upload_TestCase):
  1065. """ Test cases for `methods.scp.upload` function. """
  1066. function_to_test = staticmethod(dput.methods.scp.upload)
  1067. scenarios = NotImplemented
  1068. ssh_config_scenarios = [
  1069. ('ssh-opts-none', {
  1070. 'ssh_config_options': [],
  1071. 'expected_ssh_options': [],
  1072. }),
  1073. ('ssh-opts-one', {
  1074. 'ssh_config_options': ["foo"],
  1075. 'expected_ssh_options': ["-o", "foo"],
  1076. }),
  1077. ('ssh-opts-three', {
  1078. 'ssh_config_options': ["foo", "bar", "baz"],
  1079. 'expected_ssh_options': [
  1080. "-o", "foo", "-o", "bar", "-o", "baz"],
  1081. }),
  1082. ]
  1083. def setUp(self):
  1084. """ Set up test fixtures. """
  1085. super(scp_upload_TestCase, self).setUp()
  1086. patch_os_lstat(self)
  1087. self.set_upload_file_modes()
  1088. self.set_scp_subprocess_double()
  1089. self.set_ssh_chmod_subprocess_double()
  1090. def set_test_args(self):
  1091. """ Set the arguments for the test call to the function. """
  1092. self.test_args = dict(
  1093. fqdn=self.getUniqueString(),
  1094. login=self.login,
  1095. incoming=self.incoming_path,
  1096. files_to_upload=self.paths_to_upload,
  1097. debug=None,
  1098. compress=self.compress,
  1099. ssh_config_options=self.ssh_config_options,
  1100. progress=object(),
  1101. )
  1102. def set_scp_subprocess_double(self):
  1103. """ Set the ‘scp’ test double for the subprocess. """
  1104. command_file_path = "/usr/bin/scp"
  1105. argv = [os.path.basename(command_file_path), "-p"]
  1106. argv.extend(self.scp_compress_options)
  1107. argv.extend(self.expected_ssh_options)
  1108. argv.extend(self.paths_to_upload)
  1109. argv.append(make_remote_spec(
  1110. username=self.login, host=self.test_args['fqdn'],
  1111. dir_path=self.incoming_path))
  1112. double = SubprocessDouble(command_file_path, argv=argv)
  1113. double.register_for_testcase(self)
  1114. check_call_scenario_name = getattr(
  1115. self, 'scp_subprocess_check_call_scenario_name', "success")
  1116. double.set_subprocess_check_call_scenario(check_call_scenario_name)
  1117. self.scp_subprocess_double = double
  1118. class scp_upload_ScpTestCase(scp_upload_TestCase):
  1119. """ Test cases for `methods.scp.upload` function, with ‘scp’ command. """
  1120. compress_scenarios = [
  1121. ('compress-false', {
  1122. 'compress': False,
  1123. 'scp_compress_options': [],
  1124. }),
  1125. ('compress-true', {
  1126. 'compress': True,
  1127. 'scp_compress_options': ["-C"],
  1128. }),
  1129. ]
  1130. scenarios = testscenarios.multiply_scenarios(
  1131. upload_TestCase.files_scenarios,
  1132. upload_TestCase.incoming_scenarios,
  1133. ssh_channel_upload_TestCase.login_scenarios,
  1134. ssh_channel_upload_TestCase.stat_mode_scenarios,
  1135. compress_scenarios,
  1136. scp_upload_TestCase.ssh_config_scenarios)
  1137. def test_emits_debug_message_for_upload(self):
  1138. """ Should emit debug message for files upload. """
  1139. self.test_args['debug'] = True
  1140. self.function_to_test(**self.test_args)
  1141. expected_output = textwrap.dedent("""\
  1142. D: Uploading with scp to {host}:{incoming}
  1143. """).format(
  1144. host=make_host_spec(
  1145. username=self.login, host=self.test_args['fqdn']),
  1146. incoming=self.incoming_path)
  1147. self.assertIn(expected_output, sys.stdout.getvalue())
  1148. def test_calls_check_call_with_expected_scp_command(self):
  1149. """ Should call `subprocess.check_call` with ‘scp’ command. """
  1150. self.function_to_test(**self.test_args)
  1151. expected_call = mock.call(self.scp_subprocess_double.argv)
  1152. self.assertIn(expected_call, subprocess.check_call.mock_calls)
  1153. def test_emits_error_message_when_scp_failure(self):
  1154. """ Should emit error message when ‘scp’ command fails. """
  1155. double = self.scp_subprocess_double
  1156. double.set_subprocess_check_call_scenario("failure")
  1157. try:
  1158. self.function_to_test(**self.test_args)
  1159. except FakeSystemExit:
  1160. pass
  1161. expected_output = "Error while uploading."
  1162. self.assertIn(expected_output, sys.stdout.getvalue())
  1163. def test_calls_sys_exit_when_scp_failure(self):
  1164. """ Should call `sys.exit` when ‘scp’ command fails. """
  1165. double = self.scp_subprocess_double
  1166. double.set_subprocess_check_call_scenario("failure")
  1167. with testtools.ExpectedException(FakeSystemExit):
  1168. self.function_to_test(**self.test_args)
  1169. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  1170. class scp_upload_ChmodTestCase(scp_upload_TestCase):
  1171. """ Test cases for `methods.scp.upload` function, with ‘ssh … chmod’. """
  1172. files_scenarios = list(
  1173. (scenario_name, scenario) for (scenario_name, scenario)
  1174. in upload_TestCase.files_scenarios
  1175. if scenario['paths_to_upload'])
  1176. stat_mode_scenarios = list(
  1177. (scenario_name, scenario) for (scenario_name, scenario)
  1178. in ssh_channel_upload_TestCase.stat_mode_scenarios
  1179. if 'expected_ssh_chmod' in scenario)
  1180. compress_scenarios = [
  1181. ('compress-false', {
  1182. 'compress': False,
  1183. 'scp_compress_options': [],
  1184. }),
  1185. ]
  1186. scenarios = testscenarios.multiply_scenarios(
  1187. files_scenarios,
  1188. upload_TestCase.incoming_scenarios,
  1189. ssh_channel_upload_TestCase.login_scenarios,
  1190. stat_mode_scenarios,
  1191. compress_scenarios,
  1192. scp_upload_TestCase.ssh_config_scenarios)
  1193. def test_emits_debug_message_for_fixing_permissions(self):
  1194. """ Should emit debug message for fixing file permissions . """
  1195. self.test_args['debug'] = True
  1196. self.function_to_test(**self.test_args)
  1197. expected_output = "D: Fixing some permissions"
  1198. self.assertIn(expected_output, sys.stdout.getvalue())
  1199. def test_calls_check_call_with_expected_ssh_chmod_command(self):
  1200. """ Should call `subprocess.check_call` with ‘ssh … chmod’ command. """
  1201. self.function_to_test(**self.test_args)
  1202. expected_call = mock.call(self.ssh_chmod_subprocess_double.argv)
  1203. self.assertIn(expected_call, subprocess.check_call.mock_calls)
  1204. def test_emits_error_message_when_ssh_chmod_failure(self):
  1205. """ Should emit error message when ‘ssh … chmod’ command fails. """
  1206. double = self.ssh_chmod_subprocess_double
  1207. double.set_subprocess_check_call_scenario("failure")
  1208. try:
  1209. self.function_to_test(**self.test_args)
  1210. except FakeSystemExit:
  1211. pass
  1212. expected_output = "Error while fixing permissions."
  1213. self.assertIn(expected_output, sys.stdout.getvalue())
  1214. def test_calls_sys_exit_when_ssh_chmod_failure(self):
  1215. """ Should call `sys.exit` when ‘ssh … chmod’ command fails. """
  1216. double = self.ssh_chmod_subprocess_double
  1217. double.set_subprocess_check_call_scenario("failure")
  1218. with testtools.ExpectedException(FakeSystemExit):
  1219. self.function_to_test(**self.test_args)
  1220. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  1221. class rsync_upload_TestCase(ssh_channel_upload_TestCase):
  1222. """ Test cases for `methods.rsync.upload` function. """
  1223. function_to_test = staticmethod(dput.methods.rsync.upload)
  1224. scenarios = testscenarios.multiply_scenarios(
  1225. upload_TestCase.files_scenarios,
  1226. upload_TestCase.incoming_scenarios,
  1227. ssh_channel_upload_TestCase.login_scenarios,
  1228. ssh_channel_upload_TestCase.stat_mode_scenarios)
  1229. def setUp(self):
  1230. """ Set up test fixtures. """
  1231. super(rsync_upload_TestCase, self).setUp()
  1232. self.set_rsync_subprocess_double()
  1233. self.expected_ssh_options = []
  1234. self.set_ssh_chmod_subprocess_double()
  1235. def set_test_args(self):
  1236. """ Set the arguments for the test call to the function. """
  1237. self.test_args = dict(
  1238. fqdn=self.getUniqueString(),
  1239. login=self.login,
  1240. incoming=self.incoming_path,
  1241. files_to_upload=self.paths_to_upload,
  1242. debug=False,
  1243. dummy=object(),
  1244. progress=object(),
  1245. )
  1246. def set_rsync_subprocess_double(self):
  1247. """ Set the ‘rsync’ test double for the subprocess. """
  1248. command_file_path = "/usr/bin/rsync"
  1249. argv = [os.path.basename(command_file_path)]
  1250. argv.extend(self.paths_to_upload)
  1251. argv.extend([
  1252. "--copy-links", "--progress", "--partial",
  1253. "-zave", "ssh -x"])
  1254. argv.append(make_remote_spec(
  1255. username=self.login, host=self.test_args['fqdn'],
  1256. dir_path=self.incoming_path))
  1257. double = SubprocessDouble(command_file_path, argv=argv)
  1258. double.register_for_testcase(self)
  1259. check_call_scenario_name = getattr(
  1260. self, 'rsync_check_call_scenario_name', "success")
  1261. double.set_subprocess_check_call_scenario(check_call_scenario_name)
  1262. self.rsync_subprocess_double = double
  1263. def test_emits_debug_message_for_upload(self):
  1264. """ Should emit debug message for files upload. """
  1265. self.test_args['debug'] = True
  1266. self.function_to_test(**self.test_args)
  1267. expected_output = textwrap.dedent("""\
  1268. D: Uploading with rsync to {host}:{incoming}
  1269. """).format(
  1270. host=make_host_spec(
  1271. username=self.login, host=self.test_args['fqdn']),
  1272. incoming=self.incoming_path)
  1273. self.assertIn(expected_output, sys.stdout.getvalue())
  1274. def test_calls_check_call_with_expected_rsync_command(self):
  1275. """ Should call `subprocess.check_call` with ‘rsync’ command. """
  1276. self.function_to_test(**self.test_args)
  1277. expected_call = mock.call(self.rsync_subprocess_double.argv)
  1278. self.assertIn(expected_call, subprocess.check_call.mock_calls)
  1279. def test_emits_error_message_when_rsync_failure(self):
  1280. """ Should emit error message when ‘rsync’ command fails. """
  1281. double = self.rsync_subprocess_double
  1282. double.set_subprocess_check_call_scenario("failure")
  1283. try:
  1284. self.function_to_test(**self.test_args)
  1285. except FakeSystemExit:
  1286. pass
  1287. expected_output = "Error while uploading."
  1288. self.assertIn(expected_output, sys.stdout.getvalue())
  1289. def test_calls_sys_exit_when_rsync_failure(self):
  1290. """ Should call `sys.exit` when ‘rsync’ command fails. """
  1291. double = self.rsync_subprocess_double
  1292. double.set_subprocess_check_call_scenario("failure")
  1293. with testtools.ExpectedException(FakeSystemExit):
  1294. self.function_to_test(**self.test_args)
  1295. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  1296. def test_emits_debug_message_for_fixing_permissions(self):
  1297. """ Should emit debug message for fixing file permissions . """
  1298. self.test_args['debug'] = True
  1299. self.function_to_test(**self.test_args)
  1300. expected_output = textwrap.dedent("""\
  1301. D: Fixing file permissions with {host}
  1302. """).format(
  1303. host=make_host_spec(
  1304. username=self.login, host=self.test_args['fqdn']))
  1305. self.assertIn(expected_output, sys.stdout.getvalue())
  1306. def test_calls_check_call_with_expected_ssh_chmod_command(self):
  1307. """ Should call `subprocess.check_call` with ‘ssh … chmod’ command. """
  1308. self.function_to_test(**self.test_args)
  1309. expected_call = mock.call(list(self.ssh_chmod_subprocess_double.argv))
  1310. self.assertIn(expected_call, subprocess.check_call.mock_calls)
  1311. def test_emits_error_message_when_ssh_chmod_failure(self):
  1312. """ Should emit error message when ‘ssh … chmod’ command fails. """
  1313. double = self.ssh_chmod_subprocess_double
  1314. double.set_subprocess_check_call_scenario("failure")
  1315. try:
  1316. self.function_to_test(**self.test_args)
  1317. except FakeSystemExit:
  1318. pass
  1319. expected_output = "Error while fixing permission."
  1320. self.assertIn(expected_output, sys.stdout.getvalue())
  1321. def test_calls_sys_exit_when_ssh_chmod_failure(self):
  1322. """ Should call `sys.exit` when ‘ssh … chmod’ command fails. """
  1323. double = self.ssh_chmod_subprocess_double
  1324. double.set_subprocess_check_call_scenario("failure")
  1325. with testtools.ExpectedException(FakeSystemExit):
  1326. self.function_to_test(**self.test_args)
  1327. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  1328. # Local variables:
  1329. # coding: utf-8
  1330. # mode: python
  1331. # End:
  1332. # vim: fileencoding=utf-8 filetype=python :