test_methods.py 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601
  1. # -*- coding: utf-8; -*-
  2. #
  3. # test/test_methods.py
  4. # Part of ‘dput’, a Debian package upload toolkit.
  5. #
  6. # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
  7. #
  8. # This is free software: you may copy, modify, and/or distribute this work
  9. # under the terms of the GNU General Public License as published by the
  10. # Free Software Foundation; version 3 of that license or any later version.
  11. # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
  12. """ Unit tests for upload method behaviour. """
  13. from __future__ import (absolute_import, unicode_literals)
  14. import sys
  15. import os
  16. import os.path
  17. import io
  18. import stat
  19. import pkgutil
  20. import importlib
  21. import collections
  22. import tempfile
  23. import textwrap
  24. import doctest
  25. import getpass
  26. import ftplib
  27. if sys.version_info >= (3, 3):
  28. import urllib.parse as urlparse
  29. elif sys.version_info >= (3, 0):
  30. raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
  31. else:
  32. import urlparse
  33. import testtools
  34. import testscenarios
  35. import httpretty
  36. __package__ = str("test")
  37. __import__(__package__)
  38. sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
  39. import dput.dput
  40. import dput.helper.dputhelper
  41. import dput.methods
  42. import dput.methods.local
  43. import dput.methods.ftp
  44. import dput.methods.http
  45. import dput.methods.https
  46. import dput.methods.scp
  47. import dput.methods.rsync
  48. from .helper import (
  49. mock,
  50. FakeSystemExit,
  51. patch_system_interfaces,
  52. EXIT_STATUS_SUCCESS, EXIT_STATUS_FAILURE,
  53. FileDouble,
  54. patch_os_stat,
  55. patch_os_lstat,
  56. setup_file_double_behaviour,
  57. patch_os_spawnv,
  58. ARG_ANY,
  59. SubprocessDouble,
  60. )
  61. from .test_dputhelper import (
  62. patch_filewithprogress,
  63. )
  64. class import_upload_functions_TestCase(
  65. testscenarios.WithScenarios,
  66. testtools.TestCase):
  67. """ Test cases for `import_upload_functions` function. """
  68. scenarios = [
  69. ('empty', {
  70. 'module_names': [],
  71. }),
  72. ('one', {
  73. 'module_names': ["foo"],
  74. }),
  75. ('three', {
  76. 'module_names': ["foo", "bar", "baz"],
  77. }),
  78. ]
  79. for (scenario_name, scenario) in scenarios:
  80. modules_by_name = collections.OrderedDict()
  81. iter_modules_result = []
  82. for module_name in scenario['module_names']:
  83. module = mock.Mock()
  84. module.__name__ = module_name
  85. module.upload = mock.Mock()
  86. modules_by_name[module_name] = module
  87. module_entry = (module, module_name, False)
  88. iter_modules_result.append(module_entry)
  89. scenario['modules_by_name'] = modules_by_name
  90. scenario['iter_modules_result'] = iter_modules_result
  91. del scenario_name, scenario
  92. def setUp(self):
  93. """ Set up test fixtures. """
  94. super(import_upload_functions_TestCase, self).setUp()
  95. patch_system_interfaces(self)
  96. self.patch_import_functions()
  97. self.patch_register_upload_function()
  98. def patch_import_functions(self):
  99. """ Patch import functions used by the function. """
  100. self.patch_pkgutil_iter_modules()
  101. self.patch_importlib_import_module()
  102. def patch_pkgutil_iter_modules(self):
  103. """ Patch `pkgutil.iter_modules` function for this test case. """
  104. func_patcher = mock.patch.object(pkgutil, 'iter_modules')
  105. func_patcher.start()
  106. self.addCleanup(func_patcher.stop)
  107. pkgutil.iter_modules.return_value = self.iter_modules_result
  108. def patch_importlib_import_module(self):
  109. """ Patch `importlib.import_module` function for this test case. """
  110. func_patcher = mock.patch.object(importlib, 'import_module')
  111. func_patcher.start()
  112. self.addCleanup(func_patcher.stop)
  113. def fake_import_module(full_name):
  114. module_name = full_name.split(".")[-1]
  115. module = self.modules_by_name[module_name]
  116. return module
  117. importlib.import_module.side_effect = fake_import_module
  118. def patch_register_upload_function(self):
  119. """ Patch the `register_upload_function` function. """
  120. func_patcher = mock.patch.object(
  121. dput.dput, 'register_upload_function')
  122. func_patcher.start()
  123. self.addCleanup(func_patcher.stop)
  124. def test_registers_each_method_upload_function(self):
  125. """ Should register each upload method's upload function by name. """
  126. dput.dput.import_upload_functions()
  127. expected_calls = [
  128. mock.call(name, self.modules_by_name[name].upload)
  129. for name in self.module_names]
  130. dput.dput.register_upload_function.assert_has_calls(
  131. expected_calls)
  132. @mock.patch.object(dput.dput, 'debug', new=True)
  133. def test_emits_debug_message_for_modules_found(self):
  134. """ Should emit a debug message for the modules found. """
  135. expected_message = "D: modules_found: {names!r}".format(
  136. names=self.module_names)
  137. dput.dput.import_upload_functions()
  138. self.assertIn(expected_message, sys.stdout.getvalue())
  139. @mock.patch.object(dput.dput, 'debug', new=True)
  140. def test_emits_debug_message_for_each_import(self):
  141. """ Should emit a debug message for each module imported. """
  142. dput.dput.import_upload_functions()
  143. for (module_name, module) in self.modules_by_name.items():
  144. expected_message = "D: Module: {name} ({module!r})".format(
  145. name=module_name, module=module)
  146. self.assertIn(expected_message, sys.stdout.getvalue())
  147. @mock.patch.object(dput.dput, 'debug', new=True)
  148. def test_emits_debug_message_for_each_upload_method(self):
  149. """ Should emit a debug message for each upload method. """
  150. dput.dput.import_upload_functions()
  151. for module_name in self.module_names:
  152. expected_message = "D: Method name: {name}".format(
  153. name=module_name)
  154. self.assertIn(expected_message, sys.stdout.getvalue())
  155. @mock.patch.object(dput.dput, 'upload_methods', new_callable=dict)
  156. class register_upload_function_TestCase(testtools.TestCase):
  157. """ Test cases for `register_upload_function` function. """
  158. def test_adds_function_to_global_registry(self, global_registry):
  159. """ Should add the specified function to the global registry. """
  160. test_name = self.getUniqueString()
  161. test_func = mock.Mock()
  162. dput.dput.register_upload_function(test_name, test_func)
  163. self.assertIn(test_name, dput.dput.upload_methods)
  164. self.assertEqual(test_func, dput.dput.upload_methods[test_name])
  165. def patch_getpass_getpass(testcase):
  166. """ Patch the `getpass.getpass` function for the test case. """
  167. func_patcher = mock.patch.object(getpass, "getpass")
  168. func_patcher.start()
  169. testcase.addCleanup(func_patcher.stop)
  170. class upload_TestCase(
  171. testscenarios.WithScenarios,
  172. testtools.TestCase):
  173. """ Base for test cases for method modules `upload` functions. """
  174. files_scenarios = [
  175. ('file-list-empty', {
  176. 'paths_to_upload': [],
  177. }),
  178. ('file-list-one', {
  179. 'paths_to_upload': [tempfile.mktemp()],
  180. }),
  181. ('file-list-three', {
  182. 'paths_to_upload': [tempfile.mktemp() for __ in range(3)],
  183. }),
  184. ]
  185. spawnv_scenarios = [
  186. ('spawnv-success', {
  187. 'os_spawnv_scenario_name': 'success',
  188. 'os_spawnv_return_value': EXIT_STATUS_SUCCESS,
  189. }),
  190. ('spawnv-failure', {
  191. 'os_spawnv_scenario_name': 'failure',
  192. 'os_spawnv_return_value': EXIT_STATUS_FAILURE,
  193. }),
  194. ]
  195. spawnv_success_scenarios = [
  196. (name, params) for (name, params) in spawnv_scenarios
  197. if params['os_spawnv_return_value'] == EXIT_STATUS_SUCCESS]
  198. incoming_scenarios = [
  199. ('incoming-simple', {
  200. 'incoming_path': tempfile.mktemp(),
  201. }),
  202. ('incoming-no-leading-slash', {
  203. 'incoming_path': tempfile.mktemp().lstrip("/"),
  204. }),
  205. ('incoming-has-trailing-slash', {
  206. 'incoming_path': tempfile.mktemp() + "/",
  207. }),
  208. ]
  209. def setUp(self):
  210. """ Set up test fixtures. """
  211. super(upload_TestCase, self).setUp()
  212. patch_system_interfaces(self)
  213. patch_os_spawnv(self)
  214. patch_os_stat(self)
  215. self.set_file_doubles()
  216. setup_file_double_behaviour(self)
  217. patch_filewithprogress(self)
  218. self.set_test_args()
  219. def set_file_doubles(self):
  220. """ Set the file doubles for this test case. """
  221. for path in self.paths_to_upload:
  222. fake_file = getattr(self, 'fake_file', None)
  223. double = FileDouble(path, fake_file)
  224. double.set_os_stat_scenario(
  225. getattr(self, 'os_stat_scenario_name', "okay"))
  226. double.register_for_testcase(self)
  227. func_patcher = mock.patch.object(double.fake_file, "close")
  228. func_patcher.start()
  229. self.addCleanup(func_patcher.stop)
  230. def set_test_args(self):
  231. """ Set the arguments for the test call to the function. """
  232. raise NotImplementedError
  233. def get_spawnv_args_from_mock_call(self, call):
  234. """ Get the call arguments from a mock `os.spawnv` call. """
  235. (args, kwargs) = call
  236. return (args, kwargs)
  237. def get_command_args_from_spawnv_mock_call(self, call):
  238. """ Get command line arguments from a mock `os.spawnv` call. """
  239. (args, kwargs) = self.get_spawnv_args_from_mock_call(call)
  240. command_args = args[2]
  241. return command_args
  242. def get_command_args_from_latest_spawnv(self):
  243. """ Get command line arguments from latest mock `os.spawnv` call. """
  244. latest_call = os.spawnv.call_args
  245. command_args = self.get_command_args_from_spawnv_mock_call(latest_call)
  246. return command_args
  247. class local_upload_TestCase(upload_TestCase):
  248. """ Test cases for `methods.local.upload` function. """
  249. scenarios = testscenarios.multiply_scenarios(
  250. upload_TestCase.incoming_scenarios,
  251. upload_TestCase.files_scenarios,
  252. upload_TestCase.spawnv_success_scenarios)
  253. command_file_path = os.path.join("/usr/bin", "install")
  254. def setUp(self):
  255. """ Set up test fixtures. """
  256. super(local_upload_TestCase, self).setUp()
  257. self.set_subprocess_double()
  258. def set_test_args(self):
  259. """ Set the arguments for the test call to the function. """
  260. self.test_args = dict(
  261. fqdn=object(),
  262. login=object(),
  263. incoming=self.incoming_path,
  264. files_to_upload=self.paths_to_upload,
  265. debug=None,
  266. compress=object(),
  267. progress=object(),
  268. )
  269. def set_subprocess_double(self):
  270. """ Set the test double for the subprocess. """
  271. argv = [self.command_file_path, "-m", ARG_ANY, ARG_ANY]
  272. double = SubprocessDouble(self.command_file_path, argv=argv)
  273. double.register_for_testcase(self)
  274. double.set_os_spawnv_scenario(self.os_spawnv_scenario_name)
  275. self.subprocess_double = double
  276. def test_calls_os_spawnv_with_expected_args(self):
  277. """ Should call `os.spawnv` with expected arguments. """
  278. dput.methods.local.upload(**self.test_args)
  279. os.spawnv.assert_called_with(
  280. os.P_WAIT, self.command_file_path, mock.ANY)
  281. def test_calls_os_spawnv_with_install_command(self):
  282. """ Should call `os.spawnv` to invoke ‘install’ command. """
  283. dput.methods.local.upload(**self.test_args)
  284. spawnv_command_args = self.get_command_args_from_latest_spawnv()
  285. expected_command = self.command_file_path
  286. self.assertEqual(expected_command, spawnv_command_args[0])
  287. def test_calls_os_spawnv_with_mode_option_in_command(self):
  288. """ Should call `os.spawnv` to invoke command with mode option. """
  289. dput.methods.local.upload(**self.test_args)
  290. spawnv_command_args = self.get_command_args_from_latest_spawnv()
  291. expected_mode = (
  292. stat.S_IRUSR | stat.S_IWUSR
  293. | stat.S_IRGRP
  294. | stat.S_IROTH)
  295. expected_mode_text = "{mode:04o}".format(mode=expected_mode)[-3:]
  296. expected_option_args = ["-m", expected_mode_text]
  297. self.assertEqual(expected_option_args, spawnv_command_args[1:3])
  298. def test_calls_os_spawnv_with_file_paths_in_command(self):
  299. """ Should call `os.spawnv` with file paths in command line. """
  300. dput.methods.local.upload(**self.test_args)
  301. spawnv_command_args = self.get_command_args_from_latest_spawnv()
  302. self.assertEqual(self.paths_to_upload, spawnv_command_args[3:-1])
  303. def test_calls_os_spawnv_with_incoming_path_as_final_arg(self):
  304. """ Should call `os.spawnv` with incoming path as final argument. """
  305. dput.methods.local.upload(**self.test_args)
  306. spawnv_command_args = self.get_command_args_from_latest_spawnv()
  307. self.assertEqual(self.incoming_path, spawnv_command_args[-1])
  308. def test_emits_debug_message_for_upload_command(self):
  309. """ Should emit a debug message for the upload command. """
  310. self.test_args['debug'] = True
  311. dput.methods.local.upload(**self.test_args)
  312. expected_message = textwrap.dedent("""\
  313. D: Uploading with cp to {dir_path}
  314. D: ...
  315. """).format(dir_path=self.incoming_path)
  316. self.assertThat(
  317. sys.stdout.getvalue(),
  318. testtools.matchers.DocTestMatches(
  319. expected_message, flags=doctest.ELLIPSIS))
  320. def test_calls_sys_exit_if_os_spawnv_returns_nonzero(self):
  321. """ Should call `sys.exit` if `os.spawnv` returns non-zero. """
  322. self.subprocess_double.set_os_spawnv_scenario('failure')
  323. with testtools.ExpectedException(FakeSystemExit):
  324. dput.methods.local.upload(**self.test_args)
  325. expected_output = textwrap.dedent("""\
  326. Error while uploading.
  327. """)
  328. self.assertIn(expected_output, sys.stdout.getvalue())
  329. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  330. class ftp_upload_TestCase(upload_TestCase):
  331. """ Test cases for `methods.ftp.upload` function. """
  332. scenarios = NotImplemented
  333. def setUp(self):
  334. """ Set up test fixtures. """
  335. super(ftp_upload_TestCase, self).setUp()
  336. self.set_ftp_client()
  337. self.patch_ftplib_ftp()
  338. patch_getpass_getpass(self)
  339. self.fake_password = self.getUniqueString()
  340. getpass.getpass.return_value = self.fake_password
  341. if not hasattr(self, 'expected_password'):
  342. self.expected_password = self.fake_password
  343. def set_test_args(self):
  344. """ Set the arguments for the test call to the function. """
  345. if not hasattr(self, 'progress_type'):
  346. self.progress_type = 0
  347. self.test_args = dict(
  348. fqdn=self.getUniqueString(),
  349. login=self.login,
  350. incoming=self.incoming_path,
  351. files_to_upload=self.paths_to_upload,
  352. debug=False,
  353. ftp_mode=self.ftp_mode,
  354. progress=self.progress_type,
  355. port=object(),
  356. )
  357. def set_ftp_client(self):
  358. """ Set the FTP client double. """
  359. self.ftp_client = mock.MagicMock(name="FTP")
  360. def patch_ftplib_ftp(self):
  361. """ Patch `ftplib.FTP` class for this test case. """
  362. patcher = mock.patch.object(ftplib, "FTP")
  363. patcher.start()
  364. self.addCleanup(patcher.stop)
  365. ftplib.FTP.return_value = self.ftp_client
  366. class ftp_upload_NormalFilesTestCase(ftp_upload_TestCase):
  367. """ Test cases for `methods.ftp.upload` function, upload normal files. """
  368. login_scenarios = [
  369. ('anonymous', {
  370. 'login': "anonymous",
  371. 'expected_password': "dput@packages.debian.org",
  372. }),
  373. ('username', {
  374. 'login': "lorem",
  375. }),
  376. ]
  377. ftp_client_scenarios = [
  378. ('default', {
  379. 'ftp_mode': False,
  380. }),
  381. ('passive-mode', {
  382. 'ftp_mode': True,
  383. }),
  384. ]
  385. scenarios = testscenarios.multiply_scenarios(
  386. upload_TestCase.incoming_scenarios,
  387. upload_TestCase.files_scenarios,
  388. login_scenarios, ftp_client_scenarios)
  389. def test_emits_debug_message_for_connect(self):
  390. """ Should emit debug message for successful connect. """
  391. self.test_args['debug'] = True
  392. dput.methods.ftp.upload(**self.test_args)
  393. expected_fqdn = self.test_args['fqdn']
  394. expected_output = textwrap.dedent("""\
  395. D: FTP-Connection to host: {fqdn}
  396. """).format(fqdn=expected_fqdn)
  397. self.assertIn(expected_output, sys.stdout.getvalue())
  398. def test_calls_ftp_connect_with_expected_args(self):
  399. """ Should call `FTP.connect` with expected args. """
  400. dput.methods.ftp.upload(**self.test_args)
  401. expected_args = (
  402. self.test_args['fqdn'],
  403. self.test_args['port'],
  404. )
  405. self.ftp_client.connect.assert_called_with(*expected_args)
  406. def test_emits_error_message_when_ftp_connect_error(self):
  407. """ Should emit error message when `FTP.connect` raises error. """
  408. self.ftp_client.connect.side_effect = ftplib.error_temp
  409. try:
  410. dput.methods.ftp.upload(**self.test_args)
  411. except FakeSystemExit:
  412. pass
  413. expected_output = "Connection failed, aborting"
  414. self.assertIn(expected_output, sys.stdout.getvalue())
  415. def test_calls_sys_exit_when_ftp_connect_permission_error(self):
  416. """ Should call `sys.exit` when `FTP.connect` raises error. """
  417. self.ftp_client.connect.side_effect = ftplib.error_temp
  418. with testtools.ExpectedException(FakeSystemExit):
  419. dput.methods.ftp.upload(**self.test_args)
  420. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  421. def test_calls_ftp_login_with_expected_args(self):
  422. """ Should call `FTP.login` with expected args. """
  423. dput.methods.ftp.upload(**self.test_args)
  424. expected_args = (
  425. self.test_args['login'],
  426. self.expected_password,
  427. )
  428. self.ftp_client.login.assert_called_with(*expected_args)
  429. def test_emits_error_message_when_ftp_login_permission_error(self):
  430. """ Should emit error message when `FTP.login` permission error. """
  431. self.ftp_client.login.side_effect = ftplib.error_perm
  432. try:
  433. dput.methods.ftp.upload(**self.test_args)
  434. except FakeSystemExit:
  435. pass
  436. expected_output = "Wrong Password"
  437. self.assertIn(expected_output, sys.stdout.getvalue())
  438. def test_calls_sys_exit_when_ftp_login_permission_error(self):
  439. """ Should call `sys.exit` when `FTP.login` permission error. """
  440. self.ftp_client.login.side_effect = ftplib.error_perm
  441. with testtools.ExpectedException(FakeSystemExit):
  442. dput.methods.ftp.upload(**self.test_args)
  443. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  444. def test_emits_error_message_when_ftp_login_eof_error(self):
  445. """ Should emit error message when `FTP.login` EOF error. """
  446. self.ftp_client.login.side_effect = EOFError
  447. try:
  448. dput.methods.ftp.upload(**self.test_args)
  449. except FakeSystemExit:
  450. pass
  451. expected_output = "Server closed the connection"
  452. self.assertIn(expected_output, sys.stdout.getvalue())
  453. def test_calls_sys_exit_when_ftp_login_eof_error(self):
  454. """ Should call `sys.exit` when `FTP.login` EOF error. """
  455. self.ftp_client.login.side_effect = EOFError
  456. with testtools.ExpectedException(FakeSystemExit):
  457. dput.methods.ftp.upload(**self.test_args)
  458. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  459. def test_calls_ftp_set_pasv_with_expected_args(self):
  460. """ Should call `FTP.set_pasv` with expected args. """
  461. dput.methods.ftp.upload(**self.test_args)
  462. expected_mode = bool(self.test_args['ftp_mode'])
  463. expected_args = (expected_mode,)
  464. self.ftp_client.set_pasv.assert_called_with(*expected_args)
  465. def test_calls_ftp_cwd_with_expected_args(self):
  466. """ Should call `FTP.cwd` with expected args. """
  467. dput.methods.ftp.upload(**self.test_args)
  468. expected_path = self.incoming_path
  469. expected_args = (expected_path,)
  470. self.ftp_client.cwd.assert_called_with(*expected_args)
  471. def test_emits_debug_message_for_cwd(self):
  472. """ Should emit debug message for successful `FTP.cwd`. """
  473. self.test_args['debug'] = True
  474. dput.methods.ftp.upload(**self.test_args)
  475. expected_output = textwrap.dedent("""\
  476. D: Directory to upload to: {path}
  477. """).format(path=self.incoming_path)
  478. self.assertIn(expected_output, sys.stdout.getvalue())
  479. def test_emits_error_message_when_destination_directory_not_found(self):
  480. """ Should emit error message when destination directory not found. """
  481. error = ftplib.error_perm("550 Not Found")
  482. self.ftp_client.cwd.side_effect = error
  483. try:
  484. dput.methods.ftp.upload(**self.test_args)
  485. except FakeSystemExit:
  486. pass
  487. expected_output = "Directory to upload to does not exist."
  488. self.assertIn(expected_output, sys.stdout.getvalue())
  489. def test_calls_sys_exit_when_ftp_cwd_permission_error(self):
  490. """ Should call `sys.exit` when `FTP.cwd` permission error. """
  491. error = ftplib.error_perm("550 Not Found")
  492. self.ftp_client.cwd.side_effect = error
  493. with testtools.ExpectedException(FakeSystemExit):
  494. dput.methods.ftp.upload(**self.test_args)
  495. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  496. def test_propagates_exception_when_ftp_cwd_permission_error(self):
  497. """ Should call `sys.exit` when `FTP.cwd` permission error. """
  498. error = ftplib.error_perm("500 Bad Stuff Happened")
  499. self.ftp_client.cwd.side_effect = error
  500. with testtools.ExpectedException(error.__class__):
  501. dput.methods.ftp.upload(**self.test_args)
  502. def test_propagates_exception_when_ftp_cwd_eof_error(self):
  503. """ Should call `sys.exit` when `FTP.cwd` EOF error. """
  504. error = EOFError()
  505. self.ftp_client.cwd.side_effect = error
  506. with testtools.ExpectedException(error.__class__):
  507. dput.methods.ftp.upload(**self.test_args)
  508. def test_emits_debug_message_for_each_file(self):
  509. """ Should emit debug message for each file to upload. """
  510. self.test_args['debug'] = True
  511. dput.methods.ftp.upload(**self.test_args)
  512. expected_output = "".join(textwrap.dedent("""\
  513. D: Uploading File: {path}
  514. Uploading {filename}: done.
  515. """).format(path=path, filename=os.path.basename(path))
  516. for path in self.paths_to_upload)
  517. self.assertIn(expected_output, sys.stdout.getvalue())
  518. def test_calls_ftp_storbinary_for_each_file(self):
  519. """ Should call `FTP.storbinary` for each file to upload. """
  520. dput.methods.ftp.upload(**self.test_args)
  521. registry = FileDouble.get_registry_for_testcase(self)
  522. expected_blocksize = 1024
  523. expected_calls = [
  524. mock.call(
  525. "STOR {filename}".format(filename=os.path.basename(path)),
  526. registry[path].fake_file, expected_blocksize)
  527. for path in self.paths_to_upload]
  528. self.ftp_client.storbinary.assert_has_calls(
  529. expected_calls, any_order=True)
  530. def test_calls_close_for_each_file(self):
  531. """ Should call `file.close` for each file to upload. """
  532. dput.methods.ftp.upload(**self.test_args)
  533. registry = FileDouble.get_registry_for_testcase(self)
  534. for path in self.paths_to_upload:
  535. fake_file = registry[path].fake_file
  536. fake_file.close.assert_called_with()
  537. class ftp_upload_ErrorTestCase(ftp_upload_TestCase):
  538. """ Test cases for `methods.ftp.upload` function, error conditions. """
  539. login_scenarios = [
  540. ('anonymous', {
  541. 'login': "anonymous",
  542. 'expected_password': "dput@packages.debian.org",
  543. }),
  544. ]
  545. ftp_client_scenarios = [
  546. ('default', {
  547. 'ftp_mode': False,
  548. }),
  549. ]
  550. progress_scenarios = [
  551. ('progress-type-0', {
  552. 'progress_type': 0,
  553. }),
  554. ('progress-type-1', {
  555. 'progress_type': 1,
  556. }),
  557. ('progress-type-2', {
  558. 'progress_type': 2,
  559. }),
  560. ]
  561. files_scenarios = list(
  562. (scenario_name, scenario) for (scenario_name, scenario)
  563. in upload_TestCase.files_scenarios
  564. if scenario['paths_to_upload'])
  565. scenarios = testscenarios.multiply_scenarios(
  566. upload_TestCase.incoming_scenarios,
  567. files_scenarios,
  568. login_scenarios, ftp_client_scenarios, progress_scenarios)
  569. def test_emits_warning_when_remote_file_exists(self):
  570. """ Should emit a warning message when remote file exists. """
  571. error = ftplib.error_perm("553 Exists")
  572. self.ftp_client.storbinary.side_effect = error
  573. dput.methods.ftp.upload(**self.test_args)
  574. for path in self.paths_to_upload:
  575. expected_output = textwrap.dedent("""\
  576. Leaving existing {path} on the server and continuing
  577. """).format(path=os.path.basename(path))
  578. self.expectThat(
  579. sys.stdout.getvalue(),
  580. testtools.matchers.Contains(expected_output))
  581. def test_omits_sys_exit_when_remote_file_exists(self):
  582. """ Should omit call to `sys.exit` when remote file exists. """
  583. error = ftplib.error_perm("553 Exists")
  584. self.ftp_client.storbinary.side_effect = error
  585. dput.methods.ftp.upload(**self.test_args)
  586. self.assertFalse(sys.exit.called)
  587. def test_emits_error_message_when_storbinary_failure(self):
  588. """ Should emit an error message when `FTP.storbinary` failure. """
  589. error = ftplib.error_perm("504 Weird Stuff Happened")
  590. self.ftp_client.storbinary.side_effect = error
  591. try:
  592. dput.methods.ftp.upload(**self.test_args)
  593. except FakeSystemExit:
  594. pass
  595. for path in self.paths_to_upload[:1]:
  596. expected_output = (
  597. "Note: This error might indicate a problem with"
  598. " your passive_ftp setting.\n")
  599. self.expectThat(
  600. sys.stdout.getvalue(),
  601. testtools.matchers.Contains(expected_output))
  602. def test_calls_sys_exit_when_storbinary_failure(self):
  603. """ Should call `sys.exit` when `FTP.storbinary` failure. """
  604. error = ftplib.error_perm("504 Weird Stuff Happened")
  605. self.ftp_client.storbinary.side_effect = error
  606. with testtools.ExpectedException(FakeSystemExit):
  607. dput.methods.ftp.upload(**self.test_args)
  608. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  609. def test_emits_debug_message_when_open_failure(self):
  610. """ Should emit a debug message when `builtins.open` failure. """
  611. self.test_args['debug'] = True
  612. registry = FileDouble.get_registry_for_testcase(self)
  613. for path in self.paths_to_upload:
  614. double = registry[path]
  615. double.set_open_scenario('nonexist')
  616. try:
  617. dput.methods.ftp.upload(**self.test_args)
  618. except EnvironmentError:
  619. pass
  620. expected_output = (
  621. "D: Should exit silently now, but"
  622. " will throw exception for debug.")
  623. self.assertIn(expected_output, sys.stdout.getvalue())
  624. def test_propagates_error_from_storbinary_for_debug(self):
  625. """ Should propagate error from `FTP.storbinary` when debug. """
  626. self.test_args['debug'] = True
  627. error = ftplib.error_perm("504 Weird Stuff Happened")
  628. self.ftp_client.storbinary.side_effect = error
  629. with testtools.ExpectedException(error.__class__):
  630. dput.methods.ftp.upload(**self.test_args)
  631. def test_propagates_error_from_quit_for_debug(self):
  632. """ Should propagate error from `FTP.quit` when debug. """
  633. self.test_args['debug'] = True
  634. error = ftplib.error_perm("504 Weird Stuff Happened")
  635. self.ftp_client.quit.side_effect = error
  636. with testtools.ExpectedException(error.__class__):
  637. dput.methods.ftp.upload(**self.test_args)
  638. def make_expected_filewithprogress_attributes_by_path(testcase, attrs):
  639. """ Make a mapping from path to expected FileWithProgress attribs. """
  640. expected_attributes_by_path = {}
  641. registry = FileDouble.get_registry_for_testcase(testcase)
  642. for path in testcase.paths_to_upload:
  643. file_double = registry[path]
  644. expected_attributes = {
  645. 'f': file_double.fake_file,
  646. 'size': file_double.stat_result.st_size,
  647. }
  648. expected_attributes.update(attrs)
  649. expected_attributes_by_path[path] = expected_attributes
  650. return expected_attributes_by_path
  651. class ftp_upload_ProgressTestCase(ftp_upload_TestCase):
  652. """ Test cases for `methods.ftp.upload` function, with progress meter. """
  653. login_scenarios = [
  654. ('anonymous', {
  655. 'login': "anonymous",
  656. 'expected_password': "dput@packages.debian.org",
  657. }),
  658. ]
  659. ftp_client_scenarios = [
  660. ('default', {
  661. 'ftp_mode': False,
  662. }),
  663. ]
  664. progress_scenarios = [
  665. ('progress-type-1', {
  666. 'progress_type': 1,
  667. }),
  668. ('progress-type-2', {
  669. 'progress_type': 2,
  670. }),
  671. ]
  672. scenarios = testscenarios.multiply_scenarios(
  673. upload_TestCase.incoming_scenarios,
  674. upload_TestCase.files_scenarios,
  675. login_scenarios, ftp_client_scenarios, progress_scenarios)
  676. def test_calls_storbinary_with_filewithprogress(self):
  677. """ Should use a `FileWithProgress` to call `FTP.storbinary`. """
  678. dput.methods.ftp.upload(**self.test_args)
  679. expected_calls = [
  680. mock.call(
  681. mock.ANY, self.fake_filewithprogress,
  682. mock.ANY)
  683. for path in self.paths_to_upload]
  684. self.ftp_client.storbinary.assert_has_calls(
  685. expected_calls, any_order=True)
  686. def test_filewithprogress_has_expected_attributes(self):
  687. """ Should have expected attributes on the `FileWithProgress`. """
  688. expected_attributes_by_path = (
  689. make_expected_filewithprogress_attributes_by_path(
  690. self, {'ptype': self.progress_type}))
  691. dput.methods.ftp.upload(**self.test_args)
  692. for call in self.ftp_client.storbinary.mock_calls:
  693. (__, call_args, call_kwargs) = call
  694. (__, stor_file, __) = call_args
  695. path = stor_file.f.name
  696. expected_attributes = expected_attributes_by_path[path]
  697. stor_file_attributes = {
  698. name: getattr(stor_file, name)
  699. for name in expected_attributes}
  700. self.expectThat(
  701. expected_attributes,
  702. testtools.matchers.Equals(stor_file_attributes))
  703. def test_filewithprogress_has_sentinel_size_when_stat_failure(self):
  704. """ Should have sentinel `size` value when `os.stat` failure. """
  705. expected_attributes_by_path = (
  706. make_expected_filewithprogress_attributes_by_path(
  707. self, {'size': -1}))
  708. registry = FileDouble.get_registry_for_testcase(self)
  709. for path in self.paths_to_upload:
  710. double = registry[path]
  711. double.set_os_stat_scenario('notfound_error')
  712. dput.methods.ftp.upload(**self.test_args)
  713. for call in self.ftp_client.storbinary.mock_calls:
  714. (__, call_args, call_kwargs) = call
  715. (__, stor_file, __) = call_args
  716. path = stor_file.f.name
  717. expected_attributes = expected_attributes_by_path[path]
  718. stor_file_attributes = {
  719. name: getattr(stor_file, name)
  720. for name in expected_attributes}
  721. self.expectThat(
  722. expected_attributes,
  723. testtools.matchers.Equals(stor_file_attributes))
  724. def test_emits_debug_message_when_stat_failure(self):
  725. """ Should have sentinel `size` value when `os.stat` failure. """
  726. self.test_args['debug'] = True
  727. registry = FileDouble.get_registry_for_testcase(self)
  728. for path in self.paths_to_upload:
  729. double = registry[path]
  730. double.set_os_stat_scenario('notfound_error')
  731. dput.methods.ftp.upload(**self.test_args)
  732. for path in self.paths_to_upload:
  733. expected_output = textwrap.dedent("""\
  734. D: Determining size of file '{path}' failed
  735. """).format(path=path)
  736. self.expectThat(
  737. sys.stdout.getvalue(),
  738. testtools.matchers.Contains(expected_output))
  739. class http_upload_TestCase(upload_TestCase):
  740. """ Base for test cases for `methods.http.upload` function. """
  741. scenarios = NotImplemented
  742. protocol_scenarios = [
  743. ('http', {
  744. 'function_to_test': dput.methods.http.upload,
  745. 'protocol': "http",
  746. 'protocol_version': "HTTP/1.0",
  747. }),
  748. ('https', {
  749. 'function_to_test': dput.methods.https.upload,
  750. 'protocol': "https",
  751. 'protocol_version': "HTTP/1.0",
  752. }),
  753. ]
  754. login_scenarios = [
  755. ('username', {
  756. 'login': "lorem",
  757. }),
  758. ]
  759. def setUp(self):
  760. """ Set up test fixtures. """
  761. super(http_upload_TestCase, self).setUp()
  762. httpretty.enable()
  763. self.addCleanup(httpretty.disable)
  764. self.set_response_header_fields()
  765. self.patch_put_requests()
  766. patch_getpass_getpass(self)
  767. self.fake_password = self.getUniqueString()
  768. getpass.getpass.return_value = self.fake_password
  769. if not hasattr(self, 'expected_password'):
  770. self.expected_password = self.fake_password
  771. def set_test_args(self):
  772. """ Set the arguments for the test call to the function. """
  773. if not hasattr(self, 'progress_type'):
  774. self.progress_type = 0
  775. self.test_args = dict(
  776. fqdn=self.getUniqueString(),
  777. login=self.login,
  778. incoming=self.incoming_path,
  779. files_to_upload=self.paths_to_upload,
  780. debug=False,
  781. dummy=object(),
  782. progress=self.progress_type,
  783. )
  784. if self.function_to_test is dput.methods.http.upload:
  785. self.test_args['protocol'] = self.protocol
  786. def make_upload_uri(self, file_name):
  787. """ Make the URI for a file for upload. """
  788. uri = urlparse.urlunsplit([
  789. self.protocol, self.test_args['fqdn'],
  790. os.path.join(os.path.sep, self.incoming_path, file_name),
  791. None, None])
  792. return uri
  793. def set_response_header_fields(self):
  794. """ Set the header fields for the HTTP response. """
  795. if not hasattr(self, 'response_header_fields'):
  796. self.response_header_fields = {}
  797. def patch_put_requests(self):
  798. """ Patch the HTTP PUT requests. """
  799. self.path_by_request_uri = {}
  800. for path in self.paths_to_upload:
  801. upload_uri = self.make_upload_uri(os.path.basename(path))
  802. self.path_by_request_uri[upload_uri] = path
  803. response_body = ""
  804. httpretty.register_uri(
  805. httpretty.PUT, upload_uri,
  806. status=self.status_code, body=response_body)
  807. class http_upload_SuccessTestCase(http_upload_TestCase):
  808. """ Success test cases for `methods.http.upload` function. """
  809. response_scenarios = [
  810. ('okay', {
  811. 'status_code': 200,
  812. 'status_reason': "Okay",
  813. }),
  814. ('chatter', {
  815. 'status_code': 203,
  816. 'status_reason': "Non-Authoritative Information",
  817. }),
  818. ]
  819. auth_scenarios = [
  820. ('auth-accepted', {
  821. 'auth_response_status_code': 200,
  822. 'auth_response_status_reason': "Okay",
  823. }),
  824. ]
  825. size_scenarios = [
  826. ('size-empty', {
  827. 'fake_file': io.BytesIO(),
  828. }),
  829. ('size-1k', {
  830. 'fake_file': io.BytesIO(
  831. b"Lorem ipsum, dolor sit amet.___\n" * 32),
  832. }),
  833. ('size-100k', {
  834. 'fake_file': io.BytesIO(
  835. b"Lorem ipsum, dolor sit amet.___\n" * 3200),
  836. }),
  837. ]
  838. incoming_scenarios = list(upload_TestCase.incoming_scenarios)
  839. for (scenario_name, scenario) in incoming_scenarios:
  840. scenario['expected_url_path_prefix'] = os.path.join(
  841. os.path.sep, scenario['incoming_path'])
  842. del scenario_name, scenario
  843. scenarios = testscenarios.multiply_scenarios(
  844. upload_TestCase.files_scenarios,
  845. size_scenarios,
  846. upload_TestCase.incoming_scenarios,
  847. http_upload_TestCase.protocol_scenarios,
  848. http_upload_TestCase.login_scenarios,
  849. response_scenarios, auth_scenarios)
  850. def test_emits_debug_message_for_upload(self):
  851. """ Should emit debug message for upload. """
  852. self.test_args['debug'] = True
  853. self.function_to_test(**self.test_args)
  854. for path in self.paths_to_upload:
  855. expected_uri = self.make_upload_uri(os.path.basename(path))
  856. expected_output = textwrap.dedent("""\
  857. D: HTTP-PUT to URL: {uri}
  858. """).format(uri=expected_uri)
  859. self.expectThat(
  860. sys.stdout.getvalue(),
  861. testtools.matchers.Contains(expected_output))
  862. def test_request_has_expected_fields(self):
  863. """ Should send request with expected fields in header. """
  864. if not self.paths_to_upload:
  865. self.skipTest("No files to upload")
  866. self.function_to_test(**self.test_args)
  867. registry = FileDouble.get_registry_for_testcase(self)
  868. path = self.paths_to_upload[-1]
  869. double = registry[path]
  870. request = httpretty.last_request()
  871. expected_fields = {
  872. 'User-Agent': "dput",
  873. 'Connection': "close",
  874. 'Content-Length': "{size:d}".format(
  875. size=len(double.fake_file.getvalue())),
  876. }
  877. for (name, value) in expected_fields.items():
  878. self.expectThat(
  879. request.headers.get(name),
  880. testtools.matchers.Equals(value))
  881. class http_upload_ProgressTestCase(http_upload_TestCase):
  882. """ Test cases for `methods.http.upload` function, with progress meter. """
  883. files_scenarios = list(
  884. (scenario_name, scenario) for (scenario_name, scenario)
  885. in upload_TestCase.files_scenarios
  886. if scenario['paths_to_upload'])
  887. response_scenarios = [
  888. ('okay', {
  889. 'status_code': 200,
  890. 'status_reason': "Okay",
  891. }),
  892. ]
  893. progress_scenarios = [
  894. ('progress-type-1', {
  895. 'progress_type': 1,
  896. }),
  897. ('progress-type-2', {
  898. 'progress_type': 2,
  899. }),
  900. ]
  901. scenarios = testscenarios.multiply_scenarios(
  902. files_scenarios,
  903. progress_scenarios,
  904. upload_TestCase.incoming_scenarios,
  905. http_upload_TestCase.protocol_scenarios,
  906. http_upload_TestCase.login_scenarios,
  907. http_upload_SuccessTestCase.auth_scenarios,
  908. response_scenarios)
  909. def test_filewithprogress_has_expected_attributes(self):
  910. """ Should have expected attributes on the `FileWithProgress`. """
  911. expected_attributes_by_path = (
  912. make_expected_filewithprogress_attributes_by_path(
  913. self, {'ptype': self.progress_type}))
  914. self.function_to_test(**self.test_args)
  915. path = self.paths_to_upload[-1]
  916. expected_attributes = expected_attributes_by_path[path]
  917. fake_file_attributes = {
  918. name: getattr(self.fake_filewithprogress, name)
  919. for name in expected_attributes}
  920. self.expectThat(
  921. expected_attributes,
  922. testtools.matchers.Equals(fake_file_attributes))
  923. class http_upload_UnknownProtocolTestCase(http_upload_TestCase):
  924. """ Test cases for `methods.http.upload` function, unknown protocol. """
  925. files_scenarios = list(
  926. (scenario_name, scenario) for (scenario_name, scenario)
  927. in upload_TestCase.files_scenarios
  928. if scenario['paths_to_upload'])
  929. protocol_scenarios = [
  930. ('protocol-bogus', {
  931. 'function_to_test': dput.methods.http.upload,
  932. 'protocol': "b0gUs",
  933. 'protocol_version': "b0gUs",
  934. 'expected_exit_status': EXIT_STATUS_FAILURE,
  935. }),
  936. ]
  937. response_scenarios = [
  938. (scenario_name, scenario) for (scenario_name, scenario)
  939. in http_upload_SuccessTestCase.response_scenarios
  940. if scenario['status_code'] == 200]
  941. scenarios = testscenarios.multiply_scenarios(
  942. files_scenarios,
  943. upload_TestCase.incoming_scenarios,
  944. protocol_scenarios,
  945. http_upload_TestCase.login_scenarios,
  946. response_scenarios,
  947. http_upload_SuccessTestCase.auth_scenarios)
  948. def test_emits_error_message_when_unknown_protocol(self):
  949. """ Should emit error message when unknown protocol. """
  950. try:
  951. self.function_to_test(**self.test_args)
  952. except FakeSystemExit:
  953. pass
  954. expected_output = "Wrong protocol for upload "
  955. self.assertIn(expected_output, sys.stderr.getvalue())
  956. def test_calls_sys_exit_when_unknown_protocol(self):
  957. """ Should call `sys.exit` when unknown protocol. """
  958. with testtools.ExpectedException(FakeSystemExit):
  959. self.function_to_test(**self.test_args)
  960. sys.exit.assert_called_with(self.expected_exit_status)
  961. class http_upload_FileStatFailureTestCase(http_upload_TestCase):
  962. """ Test cases for `methods.http.upload` function, `os.stat` failure. """
  963. files_scenarios = list(
  964. (scenario_name, scenario) for (scenario_name, scenario)
  965. in upload_TestCase.files_scenarios
  966. if scenario['paths_to_upload'])
  967. os_stat_scenarios = [
  968. ('os-stat-notfound', {
  969. 'os_stat_scenario_name': "notfound_error",
  970. 'expected_exit_status': EXIT_STATUS_FAILURE,
  971. }),
  972. ('os-stat-denied', {
  973. 'os_stat_scenario_name': "denied_error",
  974. 'expected_exit_status': EXIT_STATUS_FAILURE,
  975. }),
  976. ]
  977. response_scenarios = list(
  978. (scenario_name, scenario) for (scenario_name, scenario)
  979. in http_upload_SuccessTestCase.response_scenarios
  980. if scenario['status_code'] == 200)
  981. scenarios = testscenarios.multiply_scenarios(
  982. files_scenarios,
  983. os_stat_scenarios,
  984. upload_TestCase.incoming_scenarios,
  985. http_upload_TestCase.protocol_scenarios,
  986. http_upload_TestCase.login_scenarios,
  987. response_scenarios,
  988. http_upload_SuccessTestCase.auth_scenarios)
  989. def test_emits_error_message(self):
  990. """ Should emit error message when `os.stat` failure. """
  991. try:
  992. self.function_to_test(**self.test_args)
  993. except FakeSystemExit:
  994. pass
  995. expected_output = textwrap.dedent("""\
  996. Determining size of file '{path}' failed
  997. """).format(path=self.paths_to_upload[0])
  998. self.assertIn(expected_output, sys.stderr.getvalue())
  999. def test_calls_sys_exit_with_expected_exit_status(self):
  1000. """ Should call `sys.exit` with expected exit status. """
  1001. with testtools.ExpectedException(FakeSystemExit):
  1002. self.function_to_test(**self.test_args)
  1003. sys.exit.assert_called_with(self.expected_exit_status)
  1004. class http_upload_ResponseErrorTestCase(http_upload_TestCase):
  1005. """ Error test cases for `methods.http.upload` function. """
  1006. files_scenarios = list(
  1007. (scenario_name, scenario) for (scenario_name, scenario)
  1008. in upload_TestCase.files_scenarios
  1009. if scenario['paths_to_upload'])
  1010. response_scenarios = [
  1011. ('server-error', {
  1012. 'status_code': 500,
  1013. 'status_reason': "Internal Server Error",
  1014. 'auth_response_status_code': 200,
  1015. 'auth_response_status_reason': "Okay",
  1016. 'expected_exit_status': EXIT_STATUS_FAILURE,
  1017. }),
  1018. ]
  1019. scenarios = testscenarios.multiply_scenarios(
  1020. files_scenarios,
  1021. upload_TestCase.incoming_scenarios,
  1022. http_upload_TestCase.protocol_scenarios,
  1023. http_upload_TestCase.login_scenarios,
  1024. response_scenarios)
  1025. def test_emits_error_message_when_response_status_error(self):
  1026. """ Should emit debug message when response status is error. """
  1027. try:
  1028. self.function_to_test(**self.test_args)
  1029. except FakeSystemExit:
  1030. pass
  1031. expected_output = textwrap.dedent("""\
  1032. Upload failed: {status} {reason}
  1033. """).format(status=self.status_code, reason=self.status_reason)
  1034. self.assertIn(expected_output, sys.stdout.getvalue())
  1035. def test_calls_sys_exit_when_response_status_error(self):
  1036. """ Should call `sys.exit` when response status is error. """
  1037. with testtools.ExpectedException(FakeSystemExit):
  1038. self.function_to_test(**self.test_args)
  1039. sys.exit.assert_called_with(self.expected_exit_status)
  1040. def make_host_spec(username, host):
  1041. """ Make an SSH host specification. """
  1042. host_spec = host
  1043. if username != "*":
  1044. host_spec = "{user}@{fqdn}".format(user=username, fqdn=host)
  1045. return host_spec
  1046. def make_remote_spec(username, host, dir_path):
  1047. """ Make an SCP remote specification. """
  1048. host_spec = make_host_spec(username, host)
  1049. remote_spec = "{host}:{dir}".format(host=host_spec, dir=dir_path)
  1050. return remote_spec
  1051. class ssh_channel_upload_TestCase(upload_TestCase):
  1052. """ Base for test cases for upload over SSH channel. """
  1053. function_to_test = NotImplemented
  1054. scenarios = NotImplemented
  1055. login_scenarios = [
  1056. ('login-username', {
  1057. 'login': "lorem",
  1058. }),
  1059. ('login-wildcard', {
  1060. 'login': "*",
  1061. }),
  1062. ]
  1063. stat_mode_scenarios = [
  1064. ('stat-mode-default', {}),
  1065. ('stat-mode-0620', {
  1066. 'stat_mode': 0o0620,
  1067. 'expected_ssh_chmod': True,
  1068. }),
  1069. ]
  1070. def set_upload_file_modes(self):
  1071. """ Set filesystem modes for upload files. """
  1072. registry = FileDouble.get_registry_for_testcase(self)
  1073. if hasattr(self, 'stat_mode'):
  1074. for path in self.paths_to_upload:
  1075. file_double = registry[path]
  1076. file_double.stat_result = file_double.stat_result._replace(
  1077. st_mode=self.stat_mode)
  1078. def set_ssh_chmod_subprocess_double(self):
  1079. """ Set the ‘ssh … chmod’ test double for the subprocess. """
  1080. command_file_path = "/usr/bin/ssh"
  1081. argv = [os.path.basename(command_file_path)]
  1082. argv.extend(self.expected_ssh_options)
  1083. argv.append(make_host_spec(
  1084. username=self.login, host=self.test_args['fqdn']))
  1085. argv.extend(["chmod", "0644"])
  1086. argv.extend(
  1087. os.path.join(self.incoming_path, os.path.basename(path))
  1088. for path in self.paths_to_upload)
  1089. double = SubprocessDouble(command_file_path, argv=argv)
  1090. double.register_for_testcase(self)
  1091. os_spawnv_scenario_name = getattr(
  1092. self, 'ssh_chmod_os_spawnv_scenario_name', "success")
  1093. double.set_os_spawnv_scenario(os_spawnv_scenario_name)
  1094. self.ssh_chmod_subprocess_double = double
  1095. class scp_upload_TestCase(ssh_channel_upload_TestCase):
  1096. """ Test cases for `methods.scp.upload` function. """
  1097. function_to_test = staticmethod(dput.methods.scp.upload)
  1098. scenarios = NotImplemented
  1099. ssh_config_scenarios = [
  1100. ('ssh-opts-none', {
  1101. 'ssh_config_options': [],
  1102. 'expected_ssh_options': [],
  1103. }),
  1104. ('ssh-opts-one', {
  1105. 'ssh_config_options': ["foo"],
  1106. 'expected_ssh_options': ["-o", "foo"],
  1107. }),
  1108. ('ssh-opts-three', {
  1109. 'ssh_config_options': ["foo", "bar", "baz"],
  1110. 'expected_ssh_options': [
  1111. "-o", "foo", "-o", "bar", "-o", "baz"],
  1112. }),
  1113. ]
  1114. def setUp(self):
  1115. """ Set up test fixtures. """
  1116. super(scp_upload_TestCase, self).setUp()
  1117. patch_os_lstat(self)
  1118. self.set_upload_file_modes()
  1119. self.set_scp_subprocess_double()
  1120. self.set_ssh_chmod_subprocess_double()
  1121. def set_test_args(self):
  1122. """ Set the arguments for the test call to the function. """
  1123. self.test_args = dict(
  1124. fqdn=self.getUniqueString(),
  1125. login=self.login,
  1126. incoming=self.incoming_path,
  1127. files_to_upload=self.paths_to_upload,
  1128. debug=None,
  1129. compress=self.compress,
  1130. ssh_config_options=self.ssh_config_options,
  1131. progress=object(),
  1132. )
  1133. def set_scp_subprocess_double(self):
  1134. """ Set the ‘scp’ test double for the subprocess. """
  1135. command_file_path = "/usr/bin/scp"
  1136. argv = [os.path.basename(command_file_path), "-p"]
  1137. argv.extend(self.scp_compress_options)
  1138. argv.extend(self.expected_ssh_options)
  1139. argv.extend(self.paths_to_upload)
  1140. argv.append(make_remote_spec(
  1141. username=self.login, host=self.test_args['fqdn'],
  1142. dir_path=self.incoming_path))
  1143. double = SubprocessDouble(command_file_path, argv=argv)
  1144. double.register_for_testcase(self)
  1145. os_spawnv_scenario_name = getattr(
  1146. self, 'scp_os_spawnv_scenario_name', "success")
  1147. double.set_os_spawnv_scenario(os_spawnv_scenario_name)
  1148. self.scp_subprocess_double = double
  1149. class scp_upload_ScpTestCase(scp_upload_TestCase):
  1150. """ Test cases for `methods.scp.upload` function, with ‘scp’ command. """
  1151. compress_scenarios = [
  1152. ('compress-false', {
  1153. 'compress': False,
  1154. 'scp_compress_options': [],
  1155. }),
  1156. ('compress-true', {
  1157. 'compress': True,
  1158. 'scp_compress_options': ["-C"],
  1159. }),
  1160. ]
  1161. scenarios = testscenarios.multiply_scenarios(
  1162. upload_TestCase.files_scenarios,
  1163. upload_TestCase.incoming_scenarios,
  1164. ssh_channel_upload_TestCase.login_scenarios,
  1165. ssh_channel_upload_TestCase.stat_mode_scenarios,
  1166. compress_scenarios,
  1167. scp_upload_TestCase.ssh_config_scenarios)
  1168. def test_emits_debug_message_for_upload(self):
  1169. """ Should emit debug message for files upload. """
  1170. self.test_args['debug'] = True
  1171. self.function_to_test(**self.test_args)
  1172. expected_output = textwrap.dedent("""\
  1173. D: Uploading with scp to {host}:{incoming}
  1174. """).format(
  1175. host=make_host_spec(
  1176. username=self.login, host=self.test_args['fqdn']),
  1177. incoming=self.incoming_path)
  1178. self.assertIn(expected_output, sys.stdout.getvalue())
  1179. def test_calls_os_spawnv_with_expected_scp_command(self):
  1180. """ Should call `os.spawnv` with expected ‘scp’ command. """
  1181. self.function_to_test(**self.test_args)
  1182. expected_calls = [mock.call(
  1183. os.P_WAIT,
  1184. self.scp_subprocess_double.path,
  1185. self.scp_subprocess_double.argv)]
  1186. os.spawnv.assert_has_calls(expected_calls)
  1187. def test_emits_error_message_when_scp_failure(self):
  1188. """ Should emit error message when ‘scp’ command fails. """
  1189. self.scp_subprocess_double.set_os_spawnv_scenario("failure")
  1190. try:
  1191. self.function_to_test(**self.test_args)
  1192. except FakeSystemExit:
  1193. pass
  1194. expected_output = "Error while uploading."
  1195. self.assertIn(expected_output, sys.stdout.getvalue())
  1196. def test_calls_sys_exit_when_scp_failure(self):
  1197. """ Should call `sys.exit` when ‘scp’ command fails. """
  1198. self.scp_subprocess_double.set_os_spawnv_scenario("failure")
  1199. with testtools.ExpectedException(FakeSystemExit):
  1200. self.function_to_test(**self.test_args)
  1201. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  1202. class scp_upload_ChmodTestCase(scp_upload_TestCase):
  1203. """ Test cases for `methods.scp.upload` function, with ‘ssh … chmod’. """
  1204. files_scenarios = list(
  1205. (scenario_name, scenario) for (scenario_name, scenario)
  1206. in upload_TestCase.files_scenarios
  1207. if scenario['paths_to_upload'])
  1208. stat_mode_scenarios = list(
  1209. (scenario_name, scenario) for (scenario_name, scenario)
  1210. in ssh_channel_upload_TestCase.stat_mode_scenarios
  1211. if 'expected_ssh_chmod' in scenario)
  1212. compress_scenarios = [
  1213. ('compress-false', {
  1214. 'compress': False,
  1215. 'scp_compress_options': [],
  1216. }),
  1217. ]
  1218. scenarios = testscenarios.multiply_scenarios(
  1219. files_scenarios,
  1220. upload_TestCase.incoming_scenarios,
  1221. ssh_channel_upload_TestCase.login_scenarios,
  1222. stat_mode_scenarios,
  1223. compress_scenarios,
  1224. scp_upload_TestCase.ssh_config_scenarios)
  1225. def test_emits_debug_message_for_fixing_permissions(self):
  1226. """ Should emit debug message for fixing file permissions . """
  1227. self.test_args['debug'] = True
  1228. self.function_to_test(**self.test_args)
  1229. expected_output = "D: Fixing some permissions"
  1230. self.assertIn(expected_output, sys.stdout.getvalue())
  1231. def test_calls_os_spawnv_with_expected_ssh_chmod_command(self):
  1232. """ Should call `os.spawnv` with expected ‘ssh … chmod’ command. """
  1233. self.function_to_test(**self.test_args)
  1234. expected_calls = [mock.call(
  1235. os.P_WAIT,
  1236. self.ssh_chmod_subprocess_double.path,
  1237. self.ssh_chmod_subprocess_double.argv)]
  1238. os.spawnv.assert_has_calls(expected_calls)
  1239. def test_emits_error_message_when_ssh_chmod_failure(self):
  1240. """ Should emit error message when ‘ssh … chmod’ command fails. """
  1241. self.ssh_chmod_subprocess_double.set_os_spawnv_scenario("failure")
  1242. try:
  1243. self.function_to_test(**self.test_args)
  1244. except FakeSystemExit:
  1245. pass
  1246. expected_output = "Error while fixing permissions."
  1247. self.assertIn(expected_output, sys.stdout.getvalue())
  1248. def test_calls_sys_exit_when_ssh_chmod_failure(self):
  1249. """ Should call `sys.exit` when ‘ssh … chmod’ command fails. """
  1250. self.ssh_chmod_subprocess_double.set_os_spawnv_scenario("failure")
  1251. with testtools.ExpectedException(FakeSystemExit):
  1252. self.function_to_test(**self.test_args)
  1253. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  1254. class rsync_upload_TestCase(ssh_channel_upload_TestCase):
  1255. """ Test cases for `methods.rsync.upload` function. """
  1256. function_to_test = staticmethod(dput.methods.rsync.upload)
  1257. scenarios = testscenarios.multiply_scenarios(
  1258. upload_TestCase.files_scenarios,
  1259. upload_TestCase.incoming_scenarios,
  1260. ssh_channel_upload_TestCase.login_scenarios,
  1261. ssh_channel_upload_TestCase.stat_mode_scenarios)
  1262. def setUp(self):
  1263. """ Set up test fixtures. """
  1264. super(rsync_upload_TestCase, self).setUp()
  1265. self.set_rsync_subprocess_double()
  1266. self.expected_ssh_options = []
  1267. self.set_ssh_chmod_subprocess_double()
  1268. def set_test_args(self):
  1269. """ Set the arguments for the test call to the function. """
  1270. self.test_args = dict(
  1271. fqdn=self.getUniqueString(),
  1272. login=self.login,
  1273. incoming=self.incoming_path,
  1274. files_to_upload=self.paths_to_upload,
  1275. debug=False,
  1276. dummy=object(),
  1277. progress=object(),
  1278. )
  1279. def set_rsync_subprocess_double(self):
  1280. """ Set the ‘rsync’ test double for the subprocess. """
  1281. command_file_path = "/usr/bin/rsync"
  1282. argv = [os.path.basename(command_file_path)]
  1283. argv.extend(self.paths_to_upload)
  1284. argv.extend([
  1285. "--copy-links", "--progress", "--partial",
  1286. "-zave", "ssh -x"])
  1287. argv.append(make_remote_spec(
  1288. username=self.login, host=self.test_args['fqdn'],
  1289. dir_path=self.incoming_path))
  1290. double = SubprocessDouble(command_file_path, argv=argv)
  1291. double.register_for_testcase(self)
  1292. os_spawnv_scenario_name = getattr(
  1293. self, 'rsync_os_spawnv_scenario_name', "success")
  1294. double.set_os_spawnv_scenario(os_spawnv_scenario_name)
  1295. self.rsync_subprocess_double = double
  1296. def test_emits_debug_message_for_upload(self):
  1297. """ Should emit debug message for files upload. """
  1298. self.test_args['debug'] = True
  1299. self.function_to_test(**self.test_args)
  1300. expected_output = textwrap.dedent("""\
  1301. D: Uploading with rsync to {host}:{incoming}
  1302. """).format(
  1303. host=make_host_spec(
  1304. username=self.login, host=self.test_args['fqdn']),
  1305. incoming=self.incoming_path)
  1306. self.assertIn(expected_output, sys.stdout.getvalue())
  1307. def test_calls_os_spawnv_with_expected_rsync_command(self):
  1308. """ Should call `os.spawnv` with expected ‘rsync’ command. """
  1309. self.function_to_test(**self.test_args)
  1310. expected_calls = [mock.call(
  1311. os.P_WAIT,
  1312. self.rsync_subprocess_double.path,
  1313. self.rsync_subprocess_double.argv)]
  1314. os.spawnv.assert_has_calls(expected_calls)
  1315. def test_emits_error_message_when_rsync_failure(self):
  1316. """ Should emit error message when ‘rsync’ command fails. """
  1317. self.rsync_subprocess_double.set_os_spawnv_scenario("failure")
  1318. try:
  1319. self.function_to_test(**self.test_args)
  1320. except FakeSystemExit:
  1321. pass
  1322. expected_output = "Error while uploading."
  1323. self.assertIn(expected_output, sys.stdout.getvalue())
  1324. def test_calls_sys_exit_when_rsync_failure(self):
  1325. """ Should call `sys.exit` when ‘rsync’ command fails. """
  1326. self.rsync_subprocess_double.set_os_spawnv_scenario("failure")
  1327. with testtools.ExpectedException(FakeSystemExit):
  1328. self.function_to_test(**self.test_args)
  1329. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  1330. def test_emits_debug_message_for_fixing_permissions(self):
  1331. """ Should emit debug message for fixing file permissions . """
  1332. self.test_args['debug'] = True
  1333. self.function_to_test(**self.test_args)
  1334. expected_output = textwrap.dedent("""\
  1335. D: Fixing file permissions with {host}
  1336. """).format(
  1337. host=make_host_spec(
  1338. username=self.login, host=self.test_args['fqdn']))
  1339. self.assertIn(expected_output, sys.stdout.getvalue())
  1340. def test_calls_os_spawnv_with_expected_ssh_chmod_command(self):
  1341. """ Should call `os.spawnv` with expected ‘ssh … chmod’ command. """
  1342. self.function_to_test(**self.test_args)
  1343. expected_calls = [mock.call(
  1344. os.P_WAIT,
  1345. self.ssh_chmod_subprocess_double.path,
  1346. list(self.ssh_chmod_subprocess_double.argv))]
  1347. os.spawnv.assert_has_calls(expected_calls)
  1348. def test_emits_error_message_when_ssh_chmod_failure(self):
  1349. """ Should emit error message when ‘ssh … chmod’ command fails. """
  1350. self.ssh_chmod_subprocess_double.set_os_spawnv_scenario("failure")
  1351. try:
  1352. self.function_to_test(**self.test_args)
  1353. except FakeSystemExit:
  1354. pass
  1355. expected_output = "Error while fixing permission."
  1356. self.assertIn(expected_output, sys.stdout.getvalue())
  1357. def test_calls_sys_exit_when_ssh_chmod_failure(self):
  1358. """ Should call `sys.exit` when ‘ssh … chmod’ command fails. """
  1359. self.ssh_chmod_subprocess_double.set_os_spawnv_scenario("failure")
  1360. with testtools.ExpectedException(FakeSystemExit):
  1361. self.function_to_test(**self.test_args)
  1362. sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
  1363. # Local variables:
  1364. # coding: utf-8
  1365. # mode: python
  1366. # End:
  1367. # vim: fileencoding=utf-8 filetype=python :