test_multiprocessing.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. #! /usr/bin/env python3
  2. import signal
  3. from base_test import DakTestCase
  4. from daklib.dakmultiprocessing import (
  5. PROC_STATUS_EXCEPTION,
  6. PROC_STATUS_MISCFAILURE,
  7. PROC_STATUS_SIGNALRAISED,
  8. PROC_STATUS_SUCCESS,
  9. DakProcessPool,
  10. )
  11. def async_function(num, num2):
  12. from os import getpid, kill
  13. if num == 1:
  14. sigs = [signal.SIGTERM, signal.SIGPIPE, signal.SIGALRM, signal.SIGHUP]
  15. kill(getpid(), sigs[num2])
  16. if num2 == 3:
  17. raise Exception("Test uncaught exception handling")
  18. if num == 0 and num2 == 1:
  19. return (PROC_STATUS_MISCFAILURE, "Test custom error return")
  20. return (PROC_STATUS_SUCCESS, "blah, %d, %d" % (num, num2))
  21. class DakProcessPoolTestCase(DakTestCase):
  22. def testPool(self):
  23. def alarm_handler(signum, frame):
  24. raise AssertionError("Timed out")
  25. # Shouldn't take us more than 15 seconds to run this test
  26. signal.signal(signal.SIGALRM, alarm_handler)
  27. signal.alarm(15)
  28. p = DakProcessPool()
  29. for s in range(3):
  30. for j in range(4):
  31. p.apply_async(async_function, [s, j])
  32. p.close()
  33. p.join()
  34. signal.alarm(0)
  35. signal.signal(signal.SIGALRM, signal.SIG_DFL)
  36. expected = [
  37. (PROC_STATUS_SUCCESS, "blah, 0, 0"),
  38. (PROC_STATUS_MISCFAILURE, "Test custom error return"),
  39. (PROC_STATUS_SUCCESS, "blah, 0, 2"),
  40. (PROC_STATUS_EXCEPTION, "Exception: Test uncaught exception handling"),
  41. (PROC_STATUS_SIGNALRAISED, 15),
  42. (PROC_STATUS_SIGNALRAISED, 13),
  43. (PROC_STATUS_SIGNALRAISED, 14),
  44. (PROC_STATUS_SIGNALRAISED, 1),
  45. (PROC_STATUS_SUCCESS, "blah, 2, 0"),
  46. (PROC_STATUS_SUCCESS, "blah, 2, 1"),
  47. (PROC_STATUS_SUCCESS, "blah, 2, 2"),
  48. (PROC_STATUS_EXCEPTION, "Exception: Test uncaught exception handling"),
  49. ]
  50. self.assertEqual(len(p.results), len(expected))
  51. for r in range(len(p.results)):
  52. if p.results[r] != expected[r]:
  53. code, info = p.results[r]
  54. line1 = info.splitlines()[0]
  55. self.assertEqual(code, expected[r][0])
  56. self.assertEqual(line1, expected[r][1])