helper.py 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545
  1. # -*- coding: utf-8; -*-
  2. #
  3. # test/helper.py
  4. # Part of ‘dput’, a Debian package upload toolkit.
  5. #
  6. # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
  7. #
  8. # This is free software: you may copy, modify, and/or distribute this work
  9. # under the terms of the GNU General Public License as published by the
  10. # Free Software Foundation; version 3 of that license or any later version.
  11. # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
  12. """ Helper functionality for Dput test suite. """
  13. from __future__ import (absolute_import, unicode_literals)
  14. import sys
  15. if sys.version_info >= (3, 3):
  16. import builtins
  17. import unittest
  18. import unittest.mock as mock
  19. from io import StringIO as StringIO
  20. import configparser
  21. import collections.abc as collections_abc
  22. elif sys.version_info >= (3, 0):
  23. raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
  24. elif sys.version_info >= (2, 7):
  25. # Python 2 standard library.
  26. import __builtin__ as builtins
  27. # Third-party backport of Python 3 unittest improvements.
  28. import unittest2 as unittest
  29. # Third-party mock library.
  30. import mock
  31. # Python 2 standard library.
  32. from StringIO import StringIO as BaseStringIO
  33. import ConfigParser as configparser
  34. import collections as collections_abc
  35. else:
  36. raise RuntimeError("Python earlier than 2.7 is not supported.")
  37. import os
  38. import os.path
  39. import shutil
  40. import tempfile
  41. import pwd
  42. import errno
  43. import time
  44. import signal
  45. import subprocess
  46. import functools
  47. import itertools
  48. import base64
  49. import collections
  50. import weakref
  51. import shlex
  52. __package__ = str("test")
  53. __import__(__package__)
  54. __metaclass__ = type
  55. def make_unique_slug(testcase):
  56. """ Make a unique slug for the test case. """
  57. text = base64.b64encode(
  58. testcase.getUniqueString().encode('utf-8')
  59. ).decode('utf-8')
  60. result = text[-30:]
  61. return result
  62. try:
  63. StringIO
  64. except NameError:
  65. # We don't yet have the StringIO we want. Create it.
  66. class StringIO(BaseStringIO, object):
  67. """ StringIO with a context manager. """
  68. def __enter__(self):
  69. return self
  70. def __exit__(self, *args):
  71. self.close()
  72. return False
  73. def patch_stdout(testcase):
  74. """ Patch `sys.stdout` for the specified test case. """
  75. patcher = mock.patch.object(
  76. sys, 'stdout', wraps=StringIO())
  77. patcher.start()
  78. testcase.addCleanup(patcher.stop)
  79. def patch_stderr(testcase):
  80. """ Patch `sys.stderr` for the specified test case. """
  81. patcher = mock.patch.object(
  82. sys, 'stderr', wraps=StringIO())
  83. patcher.start()
  84. testcase.addCleanup(patcher.stop)
  85. def patch_signal_signal(testcase):
  86. """ Patch `signal.signal` for the specified test case. """
  87. func_patcher = mock.patch.object(signal, 'signal')
  88. func_patcher.start()
  89. testcase.addCleanup(func_patcher.stop)
  90. class FakeSystemExit(Exception):
  91. """ Fake double for `SystemExit` exception. """
  92. EXIT_STATUS_SUCCESS = 0
  93. EXIT_STATUS_FAILURE = 1
  94. EXIT_STATUS_COMMAND_NOT_FOUND = 127
  95. def patch_sys_exit(testcase):
  96. """ Patch `sys.exit` for the specified test case. """
  97. func_patcher = mock.patch.object(
  98. sys, 'exit',
  99. side_effect=FakeSystemExit())
  100. func_patcher.start()
  101. testcase.addCleanup(func_patcher.stop)
  102. def patch_sys_argv(testcase):
  103. """ Patch the `sys.argv` sequence for the test case. """
  104. if not hasattr(testcase, 'progname'):
  105. testcase.progname = make_unique_slug(testcase)
  106. if not hasattr(testcase, 'sys_argv'):
  107. testcase.sys_argv = [testcase.progname]
  108. patcher = mock.patch.object(sys, "argv", new=list(testcase.sys_argv))
  109. patcher.start()
  110. testcase.addCleanup(patcher.stop)
  111. def patch_system_interfaces(testcase):
  112. """ Patch system interfaces that are disruptive to the test runner. """
  113. patch_stdout(testcase)
  114. patch_stderr(testcase)
  115. patch_sys_exit(testcase)
  116. patch_sys_argv(testcase)
  117. def patch_time_time(testcase, values=None):
  118. """ Patch the `time.time` function for the specified test case.
  119. :param testcase: The `TestCase` instance for binding to the patch.
  120. :param values: An iterable to provide return values.
  121. :return: None.
  122. """
  123. if values is None:
  124. values = itertools.count()
  125. def generator_fake_time():
  126. while True:
  127. yield next(values)
  128. func_patcher = mock.patch.object(time, "time")
  129. func_patcher.start()
  130. testcase.addCleanup(func_patcher.stop)
  131. time.time.side_effect = generator_fake_time()
  132. def patch_os_environ(testcase):
  133. """ Patch the `os.environ` mapping. """
  134. if not hasattr(testcase, 'os_environ'):
  135. testcase.os_environ = {}
  136. patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
  137. patcher.start()
  138. testcase.addCleanup(patcher.stop)
  139. def patch_os_getpid(testcase):
  140. """ Patch `os.getpid` for the specified test case. """
  141. func_patcher = mock.patch.object(os, 'getpid')
  142. func_patcher.start()
  143. testcase.addCleanup(func_patcher.stop)
  144. def patch_os_getuid(testcase):
  145. """ Patch the `os.getuid` function. """
  146. if not hasattr(testcase, 'os_getuid_return_value'):
  147. testcase.os_getuid_return_value = testcase.getUniqueInteger()
  148. func_patcher = mock.patch.object(
  149. os, "getuid", return_value=testcase.os_getuid_return_value)
  150. func_patcher.start()
  151. testcase.addCleanup(func_patcher.stop)
  152. PasswdEntry = collections.namedtuple(
  153. "PasswdEntry",
  154. "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
  155. def patch_pwd_getpwuid(testcase):
  156. """ Patch the `pwd.getpwuid` function. """
  157. if not hasattr(testcase, 'pwd_getpwuid_return_value'):
  158. testcase.pwd_getpwuid_return_value = PasswdEntry(
  159. pw_name=make_unique_slug(testcase),
  160. pw_passwd=make_unique_slug(testcase),
  161. pw_uid=testcase.getUniqueInteger(),
  162. pw_gid=testcase.getUniqueInteger(),
  163. pw_gecos=testcase.getUniqueString(),
  164. pw_dir=tempfile.mktemp(),
  165. pw_shell=tempfile.mktemp())
  166. if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
  167. pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
  168. else:
  169. pwent = testcase.pwd_getpwuid_return_value
  170. func_patcher = mock.patch.object(pwd, "getpwuid", return_value=pwent)
  171. func_patcher.start()
  172. testcase.addCleanup(func_patcher.stop)
  173. def patch_os_path_exists(testcase):
  174. """ Patch `os.path.exists` behaviour for this test case.
  175. When the patched function is called, the registry of
  176. `FileDouble` instances for this test case will be used to get
  177. the instance for the path specified.
  178. """
  179. orig_os_path_exists = os.path.exists
  180. def fake_os_path_exists(path):
  181. registry = FileDouble.get_registry_for_testcase(testcase)
  182. if path in registry:
  183. file_double = registry[path]
  184. result = file_double.os_path_exists_scenario.call_hook()
  185. else:
  186. result = orig_os_path_exists(path)
  187. return result
  188. func_patcher = mock.patch.object(
  189. os.path, 'exists', side_effect=fake_os_path_exists)
  190. func_patcher.start()
  191. testcase.addCleanup(func_patcher.stop)
  192. def patch_os_access(testcase):
  193. """ Patch `os.access` behaviour for this test case.
  194. When the patched function is called, the registry of
  195. `FileDouble` instances for this test case will be used to get
  196. the instance for the path specified.
  197. """
  198. orig_os_access = os.access
  199. def fake_os_access(path, mode):
  200. registry = FileDouble.get_registry_for_testcase(testcase)
  201. if path in registry:
  202. file_double = registry[path]
  203. result = file_double.os_access_scenario.call_hook(mode)
  204. else:
  205. result = orig_os_access(path, mode)
  206. return result
  207. func_patcher = mock.patch.object(
  208. os, 'access', side_effect=fake_os_access)
  209. func_patcher.start()
  210. testcase.addCleanup(func_patcher.stop)
  211. StatResult = collections.namedtuple(
  212. 'StatResult', [
  213. 'st_mode',
  214. 'st_ino', 'st_dev', 'st_nlink',
  215. 'st_uid', 'st_gid',
  216. 'st_size',
  217. 'st_atime', 'st_mtime', 'st_ctime',
  218. ])
  219. def patch_os_stat(testcase):
  220. """ Patch `os.stat` behaviour for this test case.
  221. When the patched function is called, the registry of
  222. `FileDouble` instances for this test case will be used to get
  223. the instance for the path specified.
  224. """
  225. orig_os_stat = os.stat
  226. def fake_os_stat(path):
  227. registry = FileDouble.get_registry_for_testcase(testcase)
  228. if path in registry:
  229. file_double = registry[path]
  230. result = file_double.os_stat_scenario.call_hook()
  231. else:
  232. result = orig_os_stat(path)
  233. return result
  234. func_patcher = mock.patch.object(
  235. os, 'stat', side_effect=fake_os_stat)
  236. func_patcher.start()
  237. testcase.addCleanup(func_patcher.stop)
  238. def patch_os_lstat(testcase):
  239. """ Patch `os.lstat` behaviour for this test case.
  240. When the patched function is called, the registry of
  241. `FileDouble` instances for this test case will be used to get
  242. the instance for the path specified.
  243. """
  244. orig_os_lstat = os.lstat
  245. def fake_os_lstat(path):
  246. registry = FileDouble.get_registry_for_testcase(testcase)
  247. if path in registry:
  248. file_double = registry[path]
  249. result = file_double.os_lstat_scenario.call_hook()
  250. else:
  251. result = orig_os_lstat(path)
  252. return result
  253. func_patcher = mock.patch.object(
  254. os, 'lstat', side_effect=fake_os_lstat)
  255. func_patcher.start()
  256. testcase.addCleanup(func_patcher.stop)
  257. def patch_os_unlink(testcase):
  258. """ Patch `os.unlink` behaviour for this test case.
  259. When the patched function is called, the registry of
  260. `FileDouble` instances for this test case will be used to get
  261. the instance for the path specified.
  262. """
  263. orig_os_unlink = os.unlink
  264. def fake_os_unlink(path):
  265. registry = FileDouble.get_registry_for_testcase(testcase)
  266. if path in registry:
  267. file_double = registry[path]
  268. result = file_double.os_unlink_scenario.call_hook()
  269. else:
  270. result = orig_os_unlink(path)
  271. return result
  272. func_patcher = mock.patch.object(
  273. os, 'unlink', side_effect=fake_os_unlink)
  274. func_patcher.start()
  275. testcase.addCleanup(func_patcher.stop)
  276. def patch_os_rmdir(testcase):
  277. """ Patch `os.rmdir` behaviour for this test case.
  278. When the patched function is called, the registry of
  279. `FileDouble` instances for this test case will be used to get
  280. the instance for the path specified.
  281. """
  282. orig_os_rmdir = os.rmdir
  283. def fake_os_rmdir(path):
  284. registry = FileDouble.get_registry_for_testcase(testcase)
  285. if path in registry:
  286. file_double = registry[path]
  287. result = file_double.os_rmdir_scenario.call_hook()
  288. else:
  289. result = orig_os_rmdir(path)
  290. return result
  291. func_patcher = mock.patch.object(
  292. os, 'rmdir', side_effect=fake_os_rmdir)
  293. func_patcher.start()
  294. testcase.addCleanup(func_patcher.stop)
  295. def patch_shutil_rmtree(testcase):
  296. """ Patch `shutil.rmtree` behaviour for this test case.
  297. When the patched function is called, the registry of
  298. `FileDouble` instances for this test case will be used to get
  299. the instance for the path specified.
  300. """
  301. orig_shutil_rmtree = os.rmdir
  302. def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
  303. registry = FileDouble.get_registry_for_testcase(testcase)
  304. if path in registry:
  305. file_double = registry[path]
  306. result = file_double.shutil_rmtree_scenario.call_hook()
  307. else:
  308. result = orig_shutil_rmtree(path)
  309. return result
  310. func_patcher = mock.patch.object(
  311. shutil, 'rmtree', side_effect=fake_shutil_rmtree)
  312. func_patcher.start()
  313. testcase.addCleanup(func_patcher.stop)
  314. def patch_tempfile_mkdtemp(testcase):
  315. """ Patch the `tempfile.mkdtemp` function for this test case. """
  316. if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
  317. testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
  318. double = testcase.tempfile_mkdtemp_file_double
  319. double.set_os_unlink_scenario('okay')
  320. double.set_os_rmdir_scenario('okay')
  321. double.register_for_testcase(testcase)
  322. func_patcher = mock.patch.object(tempfile, "mkdtemp")
  323. func_patcher.start()
  324. testcase.addCleanup(func_patcher.stop)
  325. tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
  326. try:
  327. FileNotFoundError
  328. FileExistsError
  329. PermissionError
  330. except NameError:
  331. # Python 2 uses IOError.
  332. def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
  333. result_kwargs = init_kwargs
  334. result_errno = errno_value
  335. result_strerror = os.strerror(errno_value)
  336. result_filename = None
  337. if len(init_args) >= 3:
  338. result_errno = init_args[0]
  339. result_filename = init_args[2]
  340. if 'errno' in init_kwargs:
  341. result_errno = init_kwargs['errno']
  342. del result_kwargs['errno']
  343. if 'filename' in init_kwargs:
  344. result_filename = init_kwargs['filename']
  345. del result_kwargs['filename']
  346. if len(init_args) >= 2:
  347. result_strerror = init_args[1]
  348. if 'strerror' in init_kwargs:
  349. result_strerror = init_kwargs['strerror']
  350. del result_kwargs['strerror']
  351. result_args = (result_errno, result_strerror, result_filename)
  352. return (result_args, result_kwargs)
  353. class FileNotFoundError(IOError):
  354. def __init__(self, *args, **kwargs):
  355. (args, kwargs) = _ensure_ioerror_args(
  356. args, kwargs, errno_value=errno.ENOENT)
  357. super(FileNotFoundError, self).__init__(*args, **kwargs)
  358. class FileExistsError(IOError):
  359. def __init__(self, *args, **kwargs):
  360. (args, kwargs) = _ensure_ioerror_args(
  361. args, kwargs, errno_value=errno.EEXIST)
  362. super(FileExistsError, self).__init__(*args, **kwargs)
  363. class PermissionError(IOError):
  364. def __init__(self, *args, **kwargs):
  365. (args, kwargs) = _ensure_ioerror_args(
  366. args, kwargs, errno_value=errno.EPERM)
  367. super(PermissionError, self).__init__(*args, **kwargs)
  368. def make_fake_file_scenarios(path=None):
  369. """ Make a collection of scenarios for testing with fake files.
  370. :path: The filesystem path of the fake file. If not specified,
  371. a valid random path will be generated.
  372. :return: A collection of scenarios for tests involving input files.
  373. The collection is a mapping from scenario name to a dictionary of
  374. scenario attributes.
  375. """
  376. if path is None:
  377. file_path = tempfile.mktemp()
  378. else:
  379. file_path = path
  380. fake_file_empty = StringIO()
  381. fake_file_minimal = StringIO("Lorem ipsum.")
  382. fake_file_large = StringIO("\n".join(
  383. "ABCDEFGH" * 100
  384. for __ in range(1000)))
  385. default_scenario_params = {
  386. 'open_scenario_name': 'okay',
  387. 'file_double_params': dict(
  388. path=file_path, fake_file=fake_file_minimal),
  389. }
  390. scenarios = {
  391. 'default': {},
  392. 'error-not-exist': {
  393. 'open_scenario_name': 'nonexist',
  394. },
  395. 'error-exist': {
  396. 'open_scenario_name': 'exist_error',
  397. },
  398. 'error-read-denied': {
  399. 'open_scenario_name': 'read_denied',
  400. },
  401. 'not-found': {
  402. 'file_double_params': dict(
  403. path=file_path, fake_file=fake_file_empty),
  404. },
  405. 'exist-empty': {
  406. 'file_double_params': dict(
  407. path=file_path, fake_file=fake_file_empty),
  408. },
  409. 'exist-minimal': {
  410. 'file_double_params': dict(
  411. path=file_path, fake_file=fake_file_minimal),
  412. },
  413. 'exist-large': {
  414. 'file_double_params': dict(
  415. path=file_path, fake_file=fake_file_large),
  416. },
  417. }
  418. for (name, scenario) in scenarios.items():
  419. params = default_scenario_params.copy()
  420. params.update(scenario)
  421. scenario.update(params)
  422. scenario['file_double'] = FileDouble(**scenario['file_double_params'])
  423. scenario['file_double'].set_open_scenario(params['open_scenario_name'])
  424. scenario['fake_file_scenario_name'] = name
  425. return scenarios
  426. def get_file_doubles_from_fake_file_scenarios(scenarios):
  427. """ Get the `FileDouble` instances from fake file scenarios.
  428. :param scenarios: Collection of fake file scenarios.
  429. :return: Collection of `FileDouble` instances.
  430. """
  431. doubles = set(
  432. scenario['file_double']
  433. for scenario in scenarios
  434. if scenario['file_double'] is not None)
  435. return doubles
  436. def setup_file_double_behaviour(testcase, doubles=None):
  437. """ Set up file double instances and behaviour.
  438. :param testcase: The `TestCase` instance to modify.
  439. :param doubles: Collection of `FileDouble` instances.
  440. :return: None.
  441. If `doubles` is ``None``, a default collection will be made
  442. from the result of `make_fake_file_scenarios` result.
  443. """
  444. if doubles is None:
  445. scenarios = make_fake_file_scenarios()
  446. doubles = get_file_doubles_from_fake_file_scenarios(
  447. scenarios.values())
  448. for file_double in doubles:
  449. file_double.register_for_testcase(testcase)
  450. orig_open = builtins.open
  451. def fake_open(path, mode='rt', buffering=-1):
  452. registry = FileDouble.get_registry_for_testcase(testcase)
  453. if path in registry:
  454. file_double = registry[path]
  455. result = file_double.builtins_open_scenario.call_hook(
  456. mode, buffering)
  457. else:
  458. result = orig_open(path, mode, buffering)
  459. return result
  460. mock_open = mock.mock_open()
  461. mock_open.side_effect = fake_open
  462. func_patcher = mock.patch.object(
  463. builtins, "open",
  464. new=mock_open)
  465. func_patcher.start()
  466. testcase.addCleanup(func_patcher.stop)
  467. def setup_fake_file_fixtures(testcase):
  468. """ Set up fixtures for fake file doubles.
  469. :param testcase: The `TestCase` instance to modify.
  470. :return: None.
  471. """
  472. scenarios = make_fake_file_scenarios()
  473. testcase.fake_file_scenarios = scenarios
  474. file_doubles = get_file_doubles_from_fake_file_scenarios(
  475. scenarios.values())
  476. setup_file_double_behaviour(testcase, file_doubles)
  477. def set_fake_file_scenario(testcase, name):
  478. """ Set the named fake file scenario for the test case. """
  479. scenario = testcase.fake_file_scenarios[name]
  480. testcase.fake_file_scenario = scenario
  481. testcase.file_double = scenario['file_double']
  482. testcase.file_double.register_for_testcase(testcase)
  483. class TestDoubleFunctionScenario:
  484. """ Scenario for fake behaviour of a specific function. """
  485. def __init__(self, scenario_name, double):
  486. self.scenario_name = scenario_name
  487. self.double = double
  488. self.call_hook = getattr(
  489. self, "_hook_{name}".format(name=self.scenario_name))
  490. def __repr__(self):
  491. text = (
  492. "<{class_name} instance: {id}"
  493. " name: {name!r},"
  494. " call_hook name: {hook_name!r}"
  495. " double: {double!r}"
  496. ">").format(
  497. class_name=self.__class__.__name__, id=id(self),
  498. name=self.scenario_name, double=self.double,
  499. hook_name=self.call_hook.__name__)
  500. return text
  501. def __eq__(self, other):
  502. result = True
  503. if not self.scenario_name == other.scenario_name:
  504. result = False
  505. if not self.double == other.double:
  506. result = False
  507. if not self.call_hook.__name__ == other.call_hook.__name__:
  508. result = False
  509. return result
  510. def __ne__(self, other):
  511. result = not self.__eq__(other)
  512. return result
  513. class os_path_exists_scenario(TestDoubleFunctionScenario):
  514. """ Scenario for `os.path.exists` behaviour. """
  515. def _hook_exist(self):
  516. return True
  517. def _hook_not_exist(self):
  518. return False
  519. class os_access_scenario(TestDoubleFunctionScenario):
  520. """ Scenario for `os.access` behaviour. """
  521. def _hook_okay(self, mode):
  522. return True
  523. def _hook_not_exist(self, mode):
  524. return False
  525. def _hook_read_only(self, mode):
  526. if mode & (os.W_OK | os.X_OK):
  527. result = False
  528. else:
  529. result = True
  530. return result
  531. def _hook_denied(self, mode):
  532. if mode & (os.R_OK | os.W_OK | os.X_OK):
  533. result = False
  534. else:
  535. result = True
  536. return result
  537. class os_stat_scenario(TestDoubleFunctionScenario):
  538. """ Scenario for `os.stat` behaviour. """
  539. def _hook_okay(self):
  540. return self.double.stat_result
  541. def _hook_notfound_error(self):
  542. raise FileNotFoundError(
  543. self.double.path,
  544. "No such file or directory: {path!r}".format(
  545. path=self.double.path))
  546. def _hook_denied_error(self):
  547. raise PermissionError(
  548. self.double.path,
  549. "Permission denied")
  550. class os_lstat_scenario(os_stat_scenario):
  551. """ Scenario for `os.lstat` behaviour. """
  552. class os_unlink_scenario(TestDoubleFunctionScenario):
  553. """ Scenario for `os.unlink` behaviour. """
  554. def _hook_okay(self):
  555. return None
  556. def _hook_nonexist(self):
  557. error = FileNotFoundError(
  558. self.double.path,
  559. "No such file or directory: {path!r}".format(
  560. path=self.double.path))
  561. raise error
  562. def _hook_denied(self):
  563. error = PermissionError(
  564. self.double.path,
  565. "Permission denied")
  566. raise error
  567. class os_rmdir_scenario(TestDoubleFunctionScenario):
  568. """ Scenario for `os.rmdir` behaviour. """
  569. def _hook_okay(self):
  570. return None
  571. def _hook_nonexist(self):
  572. error = FileNotFoundError(
  573. self.double.path,
  574. "No such file or directory: {path!r}".format(
  575. path=self.double.path))
  576. raise error
  577. def _hook_denied(self):
  578. error = PermissionError(
  579. self.double.path,
  580. "Permission denied")
  581. raise error
  582. class shutil_rmtree_scenario(TestDoubleFunctionScenario):
  583. """ Scenario for `shutil.rmtree` behaviour. """
  584. def _hook_okay(self):
  585. return None
  586. def _hook_nonexist(self):
  587. error = FileNotFoundError(
  588. self.double.path,
  589. "No such file or directory: {path!r}".format(
  590. path=self.double.path))
  591. raise error
  592. def _hook_denied(self):
  593. error = PermissionError(
  594. self.double.path,
  595. "Permission denied")
  596. raise error
  597. class builtins_open_scenario(TestDoubleFunctionScenario):
  598. """ Scenario for `builtins.open` behaviour. """
  599. def _hook_okay(self, mode, buffering):
  600. result = self.double.fake_file
  601. return result
  602. def _hook_nonexist(self, mode, buffering):
  603. if mode.startswith('r'):
  604. error = FileNotFoundError(
  605. self.double.path,
  606. "No such file or directory: {path!r}".format(
  607. path=self.double.path))
  608. raise error
  609. result = self.double.fake_file
  610. return result
  611. def _hook_exist_error(self, mode, buffering):
  612. if mode.startswith('w') or mode.startswith('a'):
  613. error = FileExistsError(
  614. self.double.path,
  615. "File already exists: {path!r}".format(
  616. path=self.double.path))
  617. raise error
  618. result = self.double.fake_file
  619. return result
  620. def _hook_read_denied(self, mode, buffering):
  621. if mode.startswith('r'):
  622. error = PermissionError(
  623. self.double.path,
  624. "Read denied on {path!r}".format(
  625. path=self.double.path))
  626. raise error
  627. result = self.double.fake_file
  628. return result
  629. def _hook_write_denied(self, mode, buffering):
  630. if mode.startswith('w') or mode.startswith('a'):
  631. error = PermissionError(
  632. self.double.path,
  633. "Write denied on {path!r}".format(
  634. path=self.double.path))
  635. raise error
  636. result = self.double.fake_file
  637. return result
  638. class TestDoubleWithRegistry:
  639. """ Abstract base class for a test double with a test case registry. """
  640. registry_class = NotImplemented
  641. registries = NotImplemented
  642. function_scenario_params_by_class = NotImplemented
  643. def __new__(cls, *args, **kwargs):
  644. superclass = super(TestDoubleWithRegistry, cls)
  645. if superclass.__new__ is object.__new__:
  646. # The ‘object’ implementation complains about extra arguments.
  647. instance = superclass.__new__(cls)
  648. else:
  649. instance = superclass.__new__(cls, *args, **kwargs)
  650. instance.make_set_scenario_methods()
  651. return instance
  652. def __init__(self, *args, **kwargs):
  653. super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
  654. self._set_method_per_scenario()
  655. def _make_set_scenario_method(self, scenario_class, params):
  656. def method(self, name):
  657. scenario = scenario_class(name, double=self)
  658. setattr(self, scenario_class.__name__, scenario)
  659. method.__doc__ = (
  660. """ Set the scenario for `{name}` behaviour. """
  661. ).format(name=scenario_class.__name__)
  662. method.__name__ = str(params['set_scenario_method_name'])
  663. return method
  664. def make_set_scenario_methods(self):
  665. """ Make `set_<scenario_class_name>` methods on this class. """
  666. for (function_scenario_class, function_scenario_params) in (
  667. self.function_scenario_params_by_class.items()):
  668. method = self._make_set_scenario_method(
  669. function_scenario_class, function_scenario_params)
  670. setattr(self.__class__, method.__name__, method)
  671. function_scenario_params['set_scenario_method'] = method
  672. def _set_method_per_scenario(self):
  673. """ Set the method to be called for each scenario. """
  674. for function_scenario_params in (
  675. self.function_scenario_params_by_class.values()):
  676. function_scenario_params['set_scenario_method'](
  677. self, function_scenario_params['default_scenario_name'])
  678. @classmethod
  679. def get_registry_for_testcase(cls, testcase):
  680. """ Get the FileDouble registry for the specified test case. """
  681. # Key in a dict must be hashable.
  682. key = (testcase.__class__, id(testcase))
  683. registry = cls.registries.setdefault(key, cls.registry_class())
  684. return registry
  685. def get_registry_key(self):
  686. """ Get the registry key for this double. """
  687. raise NotImplementedError
  688. def register_for_testcase(self, testcase):
  689. """ Add this instance to registry for the specified testcase. """
  690. registry = self.get_registry_for_testcase(testcase)
  691. key = self.get_registry_key()
  692. registry[key] = self
  693. unregister_func = functools.partial(
  694. self.unregister_for_testcase, testcase)
  695. testcase.addCleanup(unregister_func)
  696. def unregister_for_testcase(self, testcase):
  697. """ Remove this instance from registry for the specified testcase. """
  698. registry = self.get_registry_for_testcase(testcase)
  699. key = self.get_registry_key()
  700. if key in registry:
  701. registry.pop(key)
  702. def copy_fake_file(fake_file):
  703. """ Make a copy of the StringIO instance. """
  704. fake_file_type = StringIO
  705. content = ""
  706. if fake_file is not None:
  707. fake_file_type = type(fake_file)
  708. content = fake_file.getvalue()
  709. assert issubclass(fake_file_type, object)
  710. result = fake_file_type(content)
  711. return result
  712. class FileDouble(TestDoubleWithRegistry):
  713. """ A testing double for a file. """
  714. registry_class = dict
  715. registries = {}
  716. function_scenario_params_by_class = {
  717. os_path_exists_scenario: {
  718. 'default_scenario_name': 'not_exist',
  719. 'set_scenario_method_name': 'set_os_path_exists_scenario',
  720. },
  721. os_access_scenario: {
  722. 'default_scenario_name': 'okay',
  723. 'set_scenario_method_name': 'set_os_access_scenario',
  724. },
  725. os_stat_scenario: {
  726. 'default_scenario_name': 'okay',
  727. 'set_scenario_method_name': 'set_os_stat_scenario',
  728. },
  729. os_lstat_scenario: {
  730. 'default_scenario_name': 'okay',
  731. 'set_scenario_method_name': 'set_os_lstat_scenario',
  732. },
  733. builtins_open_scenario: {
  734. 'default_scenario_name': 'okay',
  735. 'set_scenario_method_name': 'set_open_scenario',
  736. },
  737. os_unlink_scenario: {
  738. 'default_scenario_name': 'okay',
  739. 'set_scenario_method_name': 'set_os_unlink_scenario',
  740. },
  741. os_rmdir_scenario: {
  742. 'default_scenario_name': 'okay',
  743. 'set_scenario_method_name': 'set_os_rmdir_scenario',
  744. },
  745. shutil_rmtree_scenario: {
  746. 'default_scenario_name': 'okay',
  747. 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
  748. },
  749. }
  750. def __init__(self, path=None, fake_file=None, *args, **kwargs):
  751. self.path = path
  752. self.fake_file = copy_fake_file(fake_file)
  753. self.fake_file.name = path
  754. self._set_stat_result()
  755. super(FileDouble, self).__init__(*args, **kwargs)
  756. def _set_stat_result(self):
  757. """ Set the `os.stat` result for this file. """
  758. size = len(self.fake_file.getvalue())
  759. self.stat_result = StatResult(
  760. st_mode=0,
  761. st_ino=None, st_dev=None, st_nlink=None,
  762. st_uid=0, st_gid=0,
  763. st_size=size,
  764. st_atime=None, st_mtime=None, st_ctime=None,
  765. )
  766. def __repr__(self):
  767. text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
  768. path=self.path, fake_file=self.fake_file)
  769. return text
  770. def get_registry_key(self):
  771. """ Get the registry key for this double. """
  772. result = self.path
  773. return result
  774. class os_popen_scenario(TestDoubleFunctionScenario):
  775. """ Scenario for `os.popen` behaviour. """
  776. stream_name_by_mode = {
  777. 'w': 'stdin',
  778. 'r': 'stdout',
  779. }
  780. def _hook_success(self, cmd, mode, buffering):
  781. stream_name = self.stream_name_by_mode[mode]
  782. stream_double = getattr(
  783. self.double, stream_name + '_double')
  784. result = stream_double.fake_file
  785. return result
  786. def _hook_failure(self, cmd, mode, buffering):
  787. result = StringIO()
  788. return result
  789. def _hook_not_found(self, cmd, mode, buffering):
  790. result = StringIO()
  791. return result
  792. class os_waitpid_scenario(TestDoubleFunctionScenario):
  793. """ Scenario for `os.waitpid` behaviour. """
  794. def _hook_success(self, pid, options):
  795. result = (pid, EXIT_STATUS_SUCCESS)
  796. return result
  797. def _hook_failure(self, pid, options):
  798. result = (pid, EXIT_STATUS_FAILURE)
  799. return result
  800. def _hook_not_found(self, pid, options):
  801. error = OSError(errno.ECHILD)
  802. raise error
  803. class os_system_scenario(TestDoubleFunctionScenario):
  804. """ Scenario for `os.system` behaviour. """
  805. def _hook_success(self, command):
  806. result = EXIT_STATUS_SUCCESS
  807. return result
  808. def _hook_failure(self, command):
  809. result = EXIT_STATUS_FAILURE
  810. return result
  811. def _hook_not_found(self, command):
  812. result = EXIT_STATUS_COMMAND_NOT_FOUND
  813. return result
  814. class os_spawnv_scenario(TestDoubleFunctionScenario):
  815. """ Scenario for `os.spawnv` behaviour. """
  816. def _hook_success(self, mode, file, args):
  817. result = EXIT_STATUS_SUCCESS
  818. return result
  819. def _hook_failure(self, mode, file, args):
  820. result = EXIT_STATUS_FAILURE
  821. return result
  822. def _hook_not_found(self, mode, file, args):
  823. result = EXIT_STATUS_COMMAND_NOT_FOUND
  824. return result
  825. ARG_ANY = object()
  826. ARG_MORE = object()
  827. class PopenDouble:
  828. """ A testing double for `subprocess.Popen`. """
  829. def __init__(self, args, *posargs, **kwargs):
  830. self.stdin = None
  831. self.stdout = None
  832. self.stderr = None
  833. self.pid = None
  834. self.returncode = None
  835. def set_streams(self, subprocess_double, popen_kwargs):
  836. """ Set the streams on the `PopenDouble`.
  837. :param subprocess_double: The `SubprocessDouble` from
  838. which to get existing stream doubles.
  839. :param popen_kwargs: The keyword arguments to the
  840. `subprocess.Popen` call.
  841. :return: ``None``.
  842. """
  843. for stream_name in (
  844. name for name in ['stdin', 'stdout', 'stderr']
  845. if name in popen_kwargs):
  846. stream_spec = popen_kwargs[stream_name]
  847. if stream_spec is subprocess.PIPE:
  848. stream_double = getattr(
  849. subprocess_double,
  850. "{name}_double".format(name=stream_name))
  851. stream_file = stream_double.fake_file
  852. elif stream_spec is subprocess.STDOUT:
  853. stream_file = subprocess_double.stdout_double.fake_file
  854. else:
  855. stream_file = stream_spec
  856. setattr(self, stream_name, stream_file)
  857. class subprocess_popen_scenario(TestDoubleFunctionScenario):
  858. """ Scenario for `subprocess.Popen` behaviour. """
  859. def _hook_success(self, testcase, args, *posargs, **kwargs):
  860. double = self.double.popen_double
  861. double.set_streams(self.double, kwargs)
  862. return double
  863. def patch_subprocess_popen(testcase):
  864. """ Patch `subprocess.Popen` constructor for this test case.
  865. :param testcase: The `TestCase` instance to modify.
  866. :return: None.
  867. When the patched function is called, the registry of
  868. `SubprocessDouble` instances for this test case will be used
  869. to get the instance for the program path specified.
  870. """
  871. orig_subprocess_popen = subprocess.Popen
  872. def fake_subprocess_popen(args, *posargs, **kwargs):
  873. if kwargs.get('shell', False):
  874. argv = shlex.split(args)
  875. else:
  876. argv = args
  877. registry = SubprocessDouble.get_registry_for_testcase(testcase)
  878. if argv in registry:
  879. subprocess_double = registry[argv]
  880. result = subprocess_double.subprocess_popen_scenario.call_hook(
  881. testcase, args, *posargs, **kwargs)
  882. else:
  883. result = orig_subprocess_popen(args, *posargs, **kwargs)
  884. return result
  885. func_patcher = mock.patch.object(
  886. subprocess, "Popen", side_effect=fake_subprocess_popen)
  887. func_patcher.start()
  888. testcase.addCleanup(func_patcher.stop)
  889. class SubprocessDoubleRegistry(collections_abc.MutableMapping):
  890. """ Registry of `SubprocessDouble` instances by `argv`. """
  891. def __init__(self, *args, **kwargs):
  892. items = []
  893. if args:
  894. if isinstance(args[0], collections_abc.Mapping):
  895. items = args[0].items()
  896. if isinstance(args[0], collections_abc.Iterable):
  897. items = args[0]
  898. self._mapping = dict(items)
  899. def __repr__(self):
  900. text = "<{class_name} object: {mapping}>".format(
  901. class_name=self.__class__.__name__, mapping=self._mapping)
  902. return text
  903. def _match_argv(self, argv):
  904. """ Match the specified `argv` with our registered keys. """
  905. match = None
  906. if not isinstance(argv, collections_abc.Sequence):
  907. return match
  908. candidates = iter(self._mapping)
  909. while match is None:
  910. try:
  911. candidate = next(candidates)
  912. except StopIteration:
  913. break
  914. found = None
  915. if candidate == argv:
  916. # An exact match.
  917. found = True
  918. word_iter = enumerate(candidate)
  919. while found is None:
  920. try:
  921. (word_index, candidate_word) = next(word_iter)
  922. except StopIteration:
  923. break
  924. if candidate_word is ARG_MORE:
  925. # Candiate matches any remaining words. We have a match.
  926. found = True
  927. elif word_index > len(argv):
  928. # Candidate is too long for the specified argv.
  929. found = False
  930. elif candidate_word is ARG_ANY:
  931. # Candidate matches any word at this position.
  932. continue
  933. elif candidate_word == argv[word_index]:
  934. # Candidate matches the word at this position.
  935. continue
  936. else:
  937. # This candidate does not match.
  938. found = False
  939. if found is None:
  940. # Reached the end of the candidate without a mismatch.
  941. found = True
  942. if found:
  943. match = candidate
  944. return match
  945. def __getitem__(self, key):
  946. match = self._match_argv(key)
  947. if match is None:
  948. raise KeyError(key)
  949. result = self._mapping[match]
  950. return result
  951. def __setitem__(self, key, value):
  952. if key in self:
  953. del self[key]
  954. self._mapping[key] = value
  955. def __delitem__(self, key):
  956. match = self._match_argv(key)
  957. if match is not None:
  958. del self._mapping[match]
  959. def __iter__(self):
  960. return self._mapping.__iter__()
  961. def __len__(self):
  962. return self._mapping.__len__()
  963. class SubprocessDouble(TestDoubleWithRegistry):
  964. """ A testing double for a subprocess. """
  965. registry_class = SubprocessDoubleRegistry
  966. registries = {}
  967. double_by_pid = weakref.WeakValueDictionary()
  968. function_scenario_params_by_class = {
  969. subprocess_popen_scenario: {
  970. 'default_scenario_name': 'success',
  971. 'set_scenario_method_name': 'set_subprocess_popen_scenario',
  972. },
  973. os_popen_scenario: {
  974. 'default_scenario_name': 'success',
  975. 'set_scenario_method_name': 'set_os_popen_scenario',
  976. },
  977. os_waitpid_scenario: {
  978. 'default_scenario_name': 'success',
  979. 'set_scenario_method_name': 'set_os_waitpid_scenario',
  980. },
  981. os_system_scenario: {
  982. 'default_scenario_name': 'success',
  983. 'set_scenario_method_name': 'set_os_system_scenario',
  984. },
  985. os_spawnv_scenario: {
  986. 'default_scenario_name': 'success',
  987. 'set_scenario_method_name': 'set_os_spawnv_scenario',
  988. },
  989. }
  990. def __init__(self, path=None, argv=None, *args, **kwargs):
  991. if path is None:
  992. path = tempfile.mktemp()
  993. self.path = path
  994. if argv is None:
  995. command_name = os.path.basename(path)
  996. argv = [command_name]
  997. self.argv = argv
  998. self.pid = self._make_pid()
  999. self._register_by_pid()
  1000. self.set_popen_double()
  1001. self.stdin_double = FileDouble()
  1002. self.stdout_double = FileDouble()
  1003. self.stderr_double = FileDouble()
  1004. super(SubprocessDouble, self).__init__(*args, **kwargs)
  1005. def set_popen_double(self):
  1006. """ Set the `PopenDouble` for this instance. """
  1007. double = PopenDouble(self.argv)
  1008. double.pid = self.pid
  1009. self.popen_double = double
  1010. def __repr__(self):
  1011. text = (
  1012. "<SubprocessDouble instance: {id}"
  1013. " path: {path!r},"
  1014. " argv: {argv!r}"
  1015. " stdin_double: {stdin_double!r}"
  1016. " stdout_double: {stdout_double!r}"
  1017. " stderr_double: {stderr_double!r}"
  1018. ">").format(
  1019. id=id(self),
  1020. path=self.path, argv=self.argv,
  1021. stdin_double=self.stdin_double,
  1022. stdout_double=self.stdout_double,
  1023. stderr_double=self.stderr_double)
  1024. return text
  1025. @classmethod
  1026. def _make_pid(cls):
  1027. """ Make a unique PID for a subprocess. """
  1028. for pid in itertools.count(1):
  1029. yield pid
  1030. def _register_by_pid(self):
  1031. """ Register this subprocess by its PID. """
  1032. self.__class__.double_by_pid[self.pid] = self
  1033. def get_registry_key(self):
  1034. """ Get the registry key for this double. """
  1035. result = tuple(self.argv)
  1036. return result
  1037. def set_stdin_content(self, text):
  1038. """ Set the content of the `stdin` stream for this double. """
  1039. self.stdin_double.fake_file = StringIO(text)
  1040. def set_stdout_content(self, text):
  1041. """ Set the content of the `stdout` stream for this double. """
  1042. self.stdout_double.fake_file = StringIO(text)
  1043. def set_stderr_content(self, text):
  1044. """ Set the content of the `stderr` stream for this double. """
  1045. self.stderr_double.fake_file = StringIO(text)
  1046. def make_fake_subprocess_scenarios(path=None):
  1047. """ Make a collection of scenarios for testing with fake files.
  1048. :path: The filesystem path of the fake program. If not specified,
  1049. a valid random path will be generated.
  1050. :return: A collection of scenarios for tests involving subprocesses.
  1051. The collection is a mapping from scenario name to a dictionary of
  1052. scenario attributes.
  1053. """
  1054. if path is None:
  1055. file_path = tempfile.mktemp()
  1056. else:
  1057. file_path = path
  1058. default_scenario_params = {
  1059. 'return_value': EXIT_STATUS_SUCCESS,
  1060. 'program_path': file_path,
  1061. 'argv_after_command_name': [],
  1062. }
  1063. scenarios = {
  1064. 'default': {},
  1065. 'not-found': {
  1066. 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
  1067. },
  1068. }
  1069. for (name, scenario) in scenarios.items():
  1070. params = default_scenario_params.copy()
  1071. params.update(scenario)
  1072. scenario.update(params)
  1073. program_path = params['program_path']
  1074. program_name = os.path.basename(params['program_path'])
  1075. argv = [program_name]
  1076. argv.extend(params['argv_after_command_name'])
  1077. subprocess_double_params = dict(
  1078. path=program_path,
  1079. argv=argv,
  1080. )
  1081. subprocess_double = SubprocessDouble(**subprocess_double_params)
  1082. scenario['subprocess_double'] = subprocess_double
  1083. scenario['fake_file_scenario_name'] = name
  1084. return scenarios
  1085. def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
  1086. """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
  1087. :param scenarios: Collection of fake subprocess scenarios.
  1088. :return: Collection of `SubprocessDouble` instances.
  1089. """
  1090. doubles = set(
  1091. scenario['subprocess_double']
  1092. for scenario in scenarios
  1093. if scenario['subprocess_double'] is not None)
  1094. return doubles
  1095. def setup_subprocess_double_behaviour(testcase, doubles=None):
  1096. """ Set up subprocess double instances and behaviour.
  1097. :param testcase: The `TestCase` instance to modify.
  1098. :param doubles: Collection of `SubprocessDouble` instances.
  1099. :return: None.
  1100. If `doubles` is ``None``, a default collection will be made
  1101. from the return value of `make_fake_subprocess_scenarios`.
  1102. """
  1103. if doubles is None:
  1104. scenarios = make_fake_subprocess_scenarios()
  1105. doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
  1106. scenarios.values())
  1107. for double in doubles:
  1108. double.register_for_testcase(testcase)
  1109. def setup_fake_subprocess_fixtures(testcase):
  1110. """ Set up fixtures for fake subprocess doubles.
  1111. :param testcase: The `TestCase` instance to modify.
  1112. :return: None.
  1113. """
  1114. scenarios = make_fake_subprocess_scenarios()
  1115. testcase.fake_subprocess_scenarios = scenarios
  1116. doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
  1117. scenarios.values())
  1118. setup_subprocess_double_behaviour(testcase, doubles)
  1119. def patch_os_popen(testcase):
  1120. """ Patch `os.popen` behaviour for this test case.
  1121. :param testcase: The `TestCase` instance to modify.
  1122. :return: None.
  1123. When the patched function is called, the registry of
  1124. `SubprocessDouble` instances for this test case will be used
  1125. to get the instance for the program path specified.
  1126. """
  1127. orig_os_popen = os.popen
  1128. def fake_os_popen(cmd, mode='r', buffering=-1):
  1129. registry = SubprocessDouble.get_registry_for_testcase(testcase)
  1130. command_argv = shlex.split(cmd)
  1131. if command_argv in registry:
  1132. subprocess_double = registry[command_argv]
  1133. result = subprocess_double.os_popen_scenario.call_hook(
  1134. cmd, mode, buffering)
  1135. else:
  1136. result = orig_os_popen(cmd, mode, buffering)
  1137. return result
  1138. func_patcher = mock.patch.object(
  1139. os, "popen", side_effect=fake_os_popen)
  1140. func_patcher.start()
  1141. testcase.addCleanup(func_patcher.stop)
  1142. def patch_os_waitpid(testcase):
  1143. """ Patch `os.waitpid` behaviour for this test case.
  1144. :param testcase: The `TestCase` instance to modify.
  1145. :return: None.
  1146. When the patched function is called, the registry of
  1147. `SubprocessDouble` instances for this test case will be used
  1148. to get the instance for the program path specified.
  1149. """
  1150. orig_os_waitpid = os.waitpid
  1151. def fake_os_waitpid(pid, options):
  1152. registry = SubprocessDouble.double_by_pid
  1153. if pid in registry:
  1154. subprocess_double = registry[pid]
  1155. result = subprocess_double.os_waitpid_scenario.call_hook(
  1156. pid, options)
  1157. else:
  1158. result = orig_os_waitpid(pid, options)
  1159. return result
  1160. func_patcher = mock.patch.object(
  1161. os, "waitpid", side_effect=fake_os_waitpid)
  1162. func_patcher.start()
  1163. testcase.addCleanup(func_patcher.stop)
  1164. def patch_os_system(testcase):
  1165. """ Patch `os.system` behaviour for this test case.
  1166. :param testcase: The `TestCase` instance to modify.
  1167. :return: None.
  1168. When the patched function is called, the registry of
  1169. `SubprocessDouble` instances for this test case will be used
  1170. to get the instance for the program path specified.
  1171. """
  1172. orig_os_system = os.system
  1173. def fake_os_system(command):
  1174. registry = SubprocessDouble.get_registry_for_testcase(testcase)
  1175. command_argv = shlex.split(command)
  1176. if command_argv in registry:
  1177. subprocess_double = registry[command_argv]
  1178. result = subprocess_double.os_system_scenario.call_hook(
  1179. command)
  1180. else:
  1181. result = orig_os_system(command)
  1182. return result
  1183. func_patcher = mock.patch.object(
  1184. os, "system", side_effect=fake_os_system)
  1185. func_patcher.start()
  1186. testcase.addCleanup(func_patcher.stop)
  1187. def patch_os_spawnv(testcase):
  1188. """ Patch `os.spawnv` behaviour for this test case.
  1189. :param testcase: The `TestCase` instance to modify.
  1190. :return: None.
  1191. When the patched function is called, the registry of
  1192. `SubprocessDouble` instances for this test case will be used
  1193. to get the instance for the program path specified.
  1194. """
  1195. orig_os_spawnv = os.spawnv
  1196. def fake_os_spawnv(mode, file, args):
  1197. registry = SubprocessDouble.get_registry_for_testcase(testcase)
  1198. registry_key = tuple(args)
  1199. if registry_key in registry:
  1200. subprocess_double = registry[registry_key]
  1201. result = subprocess_double.os_spawnv_scenario.call_hook(
  1202. mode, file, args)
  1203. else:
  1204. result = orig_os_spawnv(mode, file, args)
  1205. return result
  1206. func_patcher = mock.patch.object(
  1207. os, "spawnv", side_effect=fake_os_spawnv)
  1208. func_patcher.start()
  1209. testcase.addCleanup(func_patcher.stop)
  1210. # Local variables:
  1211. # coding: utf-8
  1212. # mode: python
  1213. # End:
  1214. # vim: fileencoding=utf-8 filetype=python :