shell_tests.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. """Tests for cement.utils.shell"""
  2. import sys
  3. import time
  4. import mock
  5. from cement.utils import shell, test
  6. from cement.core.exc import FrameworkError
  7. if sys.version_info[0] < 3:
  8. INPUT = '__builtin__.raw_input'
  9. else:
  10. INPUT = 'builtins.input'
  11. def add(a, b):
  12. return a + b
  13. class ShellUtilsTestCase(test.CementCoreTestCase):
  14. def test_exec_cmd(self):
  15. out, err, ret = shell.exec_cmd(['echo', 'KAPLA!'])
  16. self.eq(ret, 0)
  17. self.eq(out, b'KAPLA!\n')
  18. def test_exec_cmd_shell_true(self):
  19. out, err, ret = shell.exec_cmd(['echo KAPLA!'], shell=True)
  20. self.eq(ret, 0)
  21. self.eq(out, b'KAPLA!\n')
  22. def test_exec_cmd2(self):
  23. ret = shell.exec_cmd2(['echo'])
  24. self.eq(ret, 0)
  25. def test_exec_cmd2_shell_true(self):
  26. ret = shell.exec_cmd2(['echo johnny'], shell=True)
  27. self.eq(ret, 0)
  28. def test_exec_cmd_bad_command(self):
  29. out, err, ret = shell.exec_cmd(['false'])
  30. self.eq(ret, 1)
  31. def test_exec_cmd2_bad_command(self):
  32. ret = shell.exec_cmd2(['false'])
  33. self.eq(ret, 1)
  34. def test_spawn_process(self):
  35. p = shell.spawn_process(add, args=(23, 2))
  36. p.join()
  37. self.eq(p.exitcode, 0)
  38. p = shell.spawn_process(add, join=True, args=(23, 2))
  39. self.eq(p.exitcode, 0)
  40. def test_spawn_thread(self):
  41. t = shell.spawn_thread(time.sleep, args=(2,))
  42. # before joining it is alive
  43. res = t.is_alive()
  44. self.eq(res, True)
  45. t.join()
  46. # after joining it is not alive
  47. res = t.is_alive()
  48. self.eq(res, False)
  49. t = shell.spawn_thread(time.sleep, join=True, args=(2,))
  50. res = t.is_alive()
  51. self.eq(res, False)
  52. def test_prompt_simple(self):
  53. with mock.patch(INPUT, return_value='Test Input'):
  54. p = shell.Prompt("Test Prompt")
  55. self.eq(p.input, 'Test Input')
  56. def test_prompt_clear(self):
  57. # test with a non-clear command:
  58. with mock.patch(INPUT, return_value='Test Input'):
  59. p = shell.Prompt("Test Prompt",
  60. clear=True,
  61. clear_command='true',
  62. )
  63. self.eq(p.input, 'Test Input')
  64. def test_prompt_options(self):
  65. # test options (non-numbered.. user inputs actual option)
  66. with mock.patch(INPUT, return_value='y'):
  67. p = shell.Prompt("Test Prompt", options=['y', 'n'])
  68. self.eq(p.input, 'y')
  69. # test default value
  70. with mock.patch(INPUT, return_value=''):
  71. p = shell.Prompt("Test Prompt", options=['y', 'n'], default='n')
  72. self.eq(p.input, 'n')
  73. def test_prompt_numbered_options(self):
  74. # test numbered selection (user inputs number)
  75. with mock.patch(INPUT, return_value='3'):
  76. p = shell.Prompt("Test Prompt",
  77. options=['yes', 'no', 'maybe'],
  78. numbered=True,
  79. )
  80. self.eq(p.input, 'maybe')
  81. # test default value
  82. with mock.patch(INPUT, return_value=''):
  83. p = shell.Prompt(
  84. "Test Prompt",
  85. options=['yes', 'no', 'maybe'],
  86. numbered=True,
  87. default='2',
  88. )
  89. self.eq(p.input, 'no')
  90. def test_prompt_input_is_none(self):
  91. # test that self.input is none if no default, and no input
  92. with mock.patch(INPUT, return_value=''):
  93. p = shell.Prompt('Test Prompt',
  94. max_attempts=3,
  95. max_attempts_exception=False,
  96. )
  97. self.eq(p.input, None)
  98. @test.raises(FrameworkError)
  99. def test_prompt_max_attempts(self):
  100. # test that self.input is none if no default, and no input
  101. with mock.patch(INPUT, return_value=''):
  102. try:
  103. p = shell.Prompt('Test Prompt',
  104. max_attempts=3,
  105. max_attempts_exception=True,
  106. )
  107. except FrameworkError as e:
  108. self.eq(e.msg,
  109. "Maximum attempts exceeded getting valid user input",
  110. )
  111. raise
  112. def test_prompt_index_and_value_errors(self):
  113. with mock.patch(INPUT, return_value='5'):
  114. p = shell.Prompt(
  115. "Test Prompt",
  116. options=['yes', 'no', 'maybe'],
  117. numbered=True,
  118. max_attempts=3,
  119. max_attempts_exception=False,
  120. )
  121. self.eq(p.input, None)
  122. def test_prompt_case_insensitive(self):
  123. with mock.patch(INPUT, return_value='NO'):
  124. p = shell.Prompt(
  125. "Test Prompt",
  126. options=['yes', 'no', 'maybe'],
  127. case_insensitive=True,
  128. )
  129. self.eq(p.input, 'NO')
  130. with mock.patch(INPUT, return_value='NOT VALID'):
  131. p = shell.Prompt(
  132. "Test Prompt",
  133. options=['yes', 'no', 'maybe'],
  134. case_insensitive=True,
  135. max_attempts=3,
  136. max_attempts_exception=False,
  137. )
  138. self.eq(p.input, None)
  139. def test_prompt_case_sensitive(self):
  140. with mock.patch(INPUT, return_value='NO'):
  141. p = shell.Prompt(
  142. "Test Prompt",
  143. options=['yes', 'no', 'maybe'],
  144. case_insensitive=False,
  145. max_attempts=3,
  146. max_attempts_exception=False,
  147. )
  148. self.eq(p.input, None)