daemon_tests.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. """Tests for cement.ext.ext_daemon."""
  2. # NOTE: A large portion of ext_daemon is tested, but not included in
  3. # Coverage report because nose/coverage lose sight of things after the
  4. # sub-process is forked.
  5. import os
  6. import tempfile
  7. from random import random
  8. from cement.core import handler, backend, log, hook, exc
  9. from cement.utils import shell
  10. from cement.utils import test
  11. from cement.utils.misc import rando
  12. from cement.ext import ext_daemon
  13. APP = rando()[:12]
  14. class DaemonExtTestCase(test.CementExtTestCase):
  15. def setUp(self):
  16. self.app = self.make_app()
  17. def test_switch(self):
  18. env = ext_daemon.Environment()
  19. env.switch()
  20. def test_switch_with_pid(self):
  21. (_, tmpfile) = tempfile.mkstemp()
  22. os.remove(tmpfile)
  23. env = ext_daemon.Environment(pid_file=tmpfile)
  24. env.switch()
  25. try:
  26. self.ok(os.path.exists(tmpfile))
  27. finally:
  28. os.remove(tmpfile)
  29. @test.raises(exc.FrameworkError)
  30. def test_pid_exists(self):
  31. (_, tmpfile) = tempfile.mkstemp()
  32. env = ext_daemon.Environment(pid_file=tmpfile)
  33. env.switch()
  34. try:
  35. self.ok(os.path.exists(tmpfile))
  36. except exc.FrameworkError as e:
  37. self.ok(e.msg.startswith('Process already running'))
  38. raise
  39. finally:
  40. env = ext_daemon.Environment()
  41. env.switch()
  42. os.remove(tmpfile)
  43. @test.raises(exc.FrameworkError)
  44. def test_bogus_user(self):
  45. rand = random()
  46. try:
  47. env = ext_daemon.Environment(user='cement_test_user%s' % rand)
  48. except exc.FrameworkError as e:
  49. self.ok(e.msg.startswith('Daemon user'))
  50. raise
  51. finally:
  52. env = ext_daemon.Environment()
  53. env.switch()
  54. @test.raises(exc.FrameworkError)
  55. def test_bogus_group(self):
  56. rand = random()
  57. try:
  58. env = ext_daemon.Environment(group='cement_test_group%s' % rand)
  59. except exc.FrameworkError as e:
  60. self.ok(e.msg.startswith('Daemon group'))
  61. raise
  62. finally:
  63. env = ext_daemon.Environment()
  64. env.switch()
  65. def test_daemon(self):
  66. (_, tmpfile) = tempfile.mkstemp()
  67. os.remove(tmpfile)
  68. from cement.utils import shell
  69. # Test in a sub-process to avoid Nose hangup
  70. def target():
  71. app = self.make_app('test', argv=['--daemon'],
  72. extensions=['daemon'])
  73. app.setup()
  74. app.config.set('daemon', 'pid_file', tmpfile)
  75. try:
  76. # FIX ME: Can't daemonize, because nose loses sight of it
  77. app.daemonize()
  78. app.run()
  79. finally:
  80. app.close()
  81. ext_daemon.cleanup(app)
  82. p = shell.spawn_process(target)
  83. p.join()
  84. self.eq(p.exitcode, 0)
  85. def test_daemon_not_passed(self):
  86. app = self.make_app(APP, extensions=['daemon'])
  87. app.setup()
  88. app.config.set('daemon', 'pid_file', None)
  89. try:
  90. app.run()
  91. finally:
  92. ext_daemon.cleanup(app)