dbtest_multiproc.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. #! /usr/bin/env python3
  2. import unittest
  3. from multiprocessing import Pool
  4. from time import sleep
  5. import sqlalchemy.sql as sql
  6. from db_test import DBDakTestCase
  7. from daklib.dbconn import DBConn
  8. def read_number():
  9. session = DBConn().session()
  10. result = (
  11. session.query(sql.column("foo"))
  12. .from_statement(sql.text("select 7 as foo"))
  13. .scalar()
  14. )
  15. sleep(0.1)
  16. session.close()
  17. return result
  18. class MultiProcTestCase(DBDakTestCase):
  19. """
  20. This TestCase checks that DBConn works with multiprocessing.
  21. """
  22. def save_result(self, result):
  23. self.result += result
  24. def test_seven(self):
  25. """
  26. Test apply_async() with a database session.
  27. """
  28. self.result = 0
  29. pool = Pool()
  30. pool.apply_async(read_number, (), callback=self.save_result)
  31. pool.apply_async(read_number, (), callback=self.save_result)
  32. pool.apply_async(read_number, (), callback=self.save_result)
  33. pool.apply_async(read_number, (), callback=self.save_result)
  34. pool.apply_async(read_number, (), callback=self.save_result)
  35. pool.close()
  36. pool.join()
  37. self.assertEqual(5 * 7, self.result)
  38. if __name__ == "__main__":
  39. unittest.main()