123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- #pragma once
- #include <queue>
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <pthread.h>
- template <typename T>
- class BBS2chProxyThreadPool
- {
- public:
- BBS2chProxyThreadPool() : _numThreads(8), _running(false) {
- pthread_mutex_init(&_mutex, NULL);
- pthread_cond_init(&_cond, NULL);
- _threads = new pthread_t[_numThreads];
- };
- BBS2chProxyThreadPool(int n) : _numThreads(n), _running(false) {
- pthread_mutex_init(&_mutex, NULL);
- pthread_cond_init(&_cond, NULL);
- _threads = new pthread_t[_numThreads];
- };
- ~BBS2chProxyThreadPool() {
- pthread_mutex_destroy(&_mutex);
- pthread_cond_destroy(&_cond);
- delete[] _threads;
- };
- void add(const T &value) {
- pthread_mutex_lock(&_mutex);
- _queue.push(value);
- pthread_mutex_unlock(&_mutex);
- pthread_cond_signal(&_cond);
- };
- int get(T *value) {
- int ret = getAndLock(value);
- pthread_mutex_unlock(&_mutex);
- return ret;
- };
- int getAndLock(T *value) {
- int ret = 0;
- pthread_mutex_lock(&_mutex);
- while (_queue.empty() && _running) {
- pthread_cond_wait(&_cond, &_mutex);
- }
- if (!_queue.empty()) {
- if (value) *value = _queue.front();
- _queue.pop();
- }
- else if (!_running) {
- ret = 1;
- }
- return ret;
- };
- void run(void * (*func)(void *)) {
- if (_running) return;
- _running = true;
- for (int i=0; i<_numThreads; i++) {
- if (0 != pthread_create(&_threads[i], NULL, func, this))
- perror("pthread_create");
- //pthread_detach(_threads[i]);
- }
- };
- void suspend() {
- if (!_running) return;
- _running = false;
- pthread_cond_broadcast(&_cond);
- for (int i=0; i<_numThreads; i++) {
- pthread_join(_threads[i], NULL);
- }
- };
- void lock() {
- pthread_mutex_lock(&_mutex);
- };
- void unlock() {
- pthread_mutex_unlock(&_mutex);
- };
- size_t countInQueue() {
- return _queue.size();
- };
- private:
- int _numThreads;
- std::queue<T> _queue;
- pthread_mutex_t _mutex;
- pthread_cond_t _cond;
- pthread_t *_threads;
- bool _running;
- };
|