test_pqp.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from typing import Sequence, MutableMapping
  4. import pytest
  5. from draugr.multiprocessing_utilities.pooled_queue_processor import (
  6. PooledQueueProcessor,
  7. PooledQueueTask,
  8. )
  9. from warg.functions import identity
  10. __author__ = "Christian Heider Nielsen"
  11. class Square(PooledQueueTask):
  12. def call(self, i, *args: Sequence, **kwargs: MutableMapping):
  13. """description"""
  14. return i * 2
  15. class Exc(PooledQueueTask):
  16. def call(self, *args: Sequence, **kwargs: MutableMapping):
  17. """description"""
  18. raise NotImplementedError
  19. @pytest.mark.skip
  20. def test_integration_success():
  21. task = Square()
  22. with PooledQueueProcessor(
  23. task, [2], fill_at_construction=True, max_queue_size=10
  24. ) as processor:
  25. for a, _ in zip(processor, range(30)):
  26. pass
  27. # print(a)
  28. @pytest.mark.skip
  29. def test_integration_func():
  30. task = identity
  31. with PooledQueueProcessor(task, [2], max_queue_size=10) as processor:
  32. for a, _ in zip(processor, range(30)):
  33. pass
  34. # print(a)
  35. @pytest.mark.skip
  36. def test_lambda_func():
  37. task = lambda x: x
  38. with PooledQueueProcessor(task, [2], max_queue_size=10) as processor:
  39. for a, _ in zip(processor, range(30)):
  40. pass
  41. # print(a)
  42. @pytest.mark.skip
  43. def test_integration_except():
  44. task = Exc()
  45. with pytest.raises(NotImplementedError) as exc_info:
  46. task() # TODO: MP does not work in pytest
  47. processor = PooledQueueProcessor(task, [2], max_queue_size=10, blocking=True)
  48. for a, _ in zip(processor, range(30)):
  49. pass
  50. # print(a)
  51. assert exc_info.type is NotImplementedError
  52. @pytest.mark.skip
  53. #
  54. def test_integration_except_ctx():
  55. task = Exc()
  56. with pytest.raises(NotImplementedError) as exc_info:
  57. task() # TODO: MP does not work in pytest
  58. with PooledQueueProcessor(task, [2], max_queue_size=10) as processor:
  59. for a, _ in zip(processor, range(30)):
  60. pass
  61. # print(a)
  62. assert exc_info.type is NotImplementedError
  63. if __name__ == "__main__":
  64. test_lambda_func()