CryptoWorker.h 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. #ifndef CRYPTO_WORKER_H_
  2. #define CRYPTO_WORKER_H_
  3. #include <condition_variable>
  4. #include <mutex>
  5. #include <deque>
  6. #include <thread>
  7. #include <vector>
  8. #include <memory>
  9. namespace i2p
  10. {
  11. namespace worker
  12. {
  13. template<typename Caller>
  14. struct ThreadPool
  15. {
  16. typedef std::function<void(void)> ResultFunc;
  17. typedef std::function<ResultFunc(void)> WorkFunc;
  18. typedef std::pair<std::shared_ptr<Caller>, WorkFunc> Job;
  19. typedef std::mutex mtx_t;
  20. typedef std::unique_lock<mtx_t> lock_t;
  21. typedef std::condition_variable cond_t;
  22. ThreadPool(int workers)
  23. {
  24. stop = false;
  25. if(workers > 0)
  26. {
  27. while(workers--)
  28. {
  29. threads.emplace_back([this] {
  30. for (;;)
  31. {
  32. Job job;
  33. {
  34. lock_t lock(this->queue_mutex);
  35. this->condition.wait(
  36. lock, [this] { return this->stop || !this->jobs.empty(); });
  37. if (this->stop && this->jobs.empty()) return;
  38. job = std::move(this->jobs.front());
  39. this->jobs.pop_front();
  40. }
  41. ResultFunc result = job.second();
  42. job.first->GetService().post(result);
  43. }
  44. });
  45. }
  46. }
  47. };
  48. void Offer(const Job & job)
  49. {
  50. {
  51. lock_t lock(queue_mutex);
  52. if (stop) return;
  53. jobs.emplace_back(job);
  54. }
  55. condition.notify_one();
  56. }
  57. ~ThreadPool()
  58. {
  59. {
  60. lock_t lock(queue_mutex);
  61. stop = true;
  62. }
  63. condition.notify_all();
  64. for(auto &t: threads) t.join();
  65. }
  66. std::vector<std::thread> threads;
  67. std::deque<Job> jobs;
  68. mtx_t queue_mutex;
  69. cond_t condition;
  70. bool stop;
  71. };
  72. }
  73. }
  74. #endif