123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- #! /usr/bin/env python3
- import unittest
- from multiprocessing import Pool
- from time import sleep
- import sqlalchemy.sql as sql
- from db_test import DBDakTestCase
- from daklib.dbconn import DBConn
- def read_number():
- session = DBConn().session()
- result = (
- session.query(sql.column("foo"))
- .from_statement(sql.text("select 7 as foo"))
- .scalar()
- )
- sleep(0.1)
- session.close()
- return result
- class MultiProcTestCase(DBDakTestCase):
- """
- This TestCase checks that DBConn works with multiprocessing.
- """
- def save_result(self, result):
- self.result += result
- def test_seven(self):
- """
- Test apply_async() with a database session.
- """
- self.result = 0
- pool = Pool()
- pool.apply_async(read_number, (), callback=self.save_result)
- pool.apply_async(read_number, (), callback=self.save_result)
- pool.apply_async(read_number, (), callback=self.save_result)
- pool.apply_async(read_number, (), callback=self.save_result)
- pool.apply_async(read_number, (), callback=self.save_result)
- pool.close()
- pool.join()
- self.assertEqual(5 * 7, self.result)
- if __name__ == "__main__":
- unittest.main()
|