BBS2chProxyThreadPool.h 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. #pragma once
  2. #include <queue>
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <unistd.h>
  6. #include <pthread.h>
  7. template <typename T>
  8. class BBS2chProxyThreadPool
  9. {
  10. public:
  11. BBS2chProxyThreadPool() : _numThreads(8), _running(false) {
  12. pthread_mutex_init(&_mutex, NULL);
  13. pthread_cond_init(&_cond, NULL);
  14. _threads = new pthread_t[_numThreads];
  15. };
  16. BBS2chProxyThreadPool(int n) : _numThreads(n), _running(false) {
  17. pthread_mutex_init(&_mutex, NULL);
  18. pthread_cond_init(&_cond, NULL);
  19. _threads = new pthread_t[_numThreads];
  20. };
  21. ~BBS2chProxyThreadPool() {
  22. pthread_mutex_destroy(&_mutex);
  23. pthread_cond_destroy(&_cond);
  24. delete[] _threads;
  25. };
  26. void add(const T &value) {
  27. pthread_mutex_lock(&_mutex);
  28. _queue.push(value);
  29. pthread_mutex_unlock(&_mutex);
  30. pthread_cond_signal(&_cond);
  31. };
  32. int get(T *value) {
  33. int ret = getAndLock(value);
  34. pthread_mutex_unlock(&_mutex);
  35. return ret;
  36. };
  37. int getAndLock(T *value) {
  38. int ret = 0;
  39. pthread_mutex_lock(&_mutex);
  40. while (_queue.empty() && _running) {
  41. pthread_cond_wait(&_cond, &_mutex);
  42. }
  43. if (!_queue.empty()) {
  44. if (value) *value = _queue.front();
  45. _queue.pop();
  46. }
  47. else if (!_running) {
  48. ret = 1;
  49. }
  50. return ret;
  51. };
  52. void run(void * (*func)(void *)) {
  53. if (_running) return;
  54. _running = true;
  55. for (int i=0; i<_numThreads; i++) {
  56. if (0 != pthread_create(&_threads[i], NULL, func, this))
  57. perror("pthread_create");
  58. //pthread_detach(_threads[i]);
  59. }
  60. };
  61. void suspend() {
  62. if (!_running) return;
  63. _running = false;
  64. pthread_cond_broadcast(&_cond);
  65. for (int i=0; i<_numThreads; i++) {
  66. pthread_join(_threads[i], NULL);
  67. }
  68. };
  69. void lock() {
  70. pthread_mutex_lock(&_mutex);
  71. };
  72. void unlock() {
  73. pthread_mutex_unlock(&_mutex);
  74. };
  75. size_t countInQueue() {
  76. return _queue.size();
  77. };
  78. private:
  79. int _numThreads;
  80. std::queue<T> _queue;
  81. pthread_mutex_t _mutex;
  82. pthread_cond_t _cond;
  83. pthread_t *_threads;
  84. bool _running;
  85. };