checkqueue.h 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // Copyright (c) 2012-2014 The Bitcoin Core developers
  2. // Distributed under the MIT software license, see the accompanying
  3. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4. #ifndef BITCOIN_CHECKQUEUE_H
  5. #define BITCOIN_CHECKQUEUE_H
  6. #include <algorithm>
  7. #include <vector>
  8. #include <boost/foreach.hpp>
  9. #include <boost/thread/condition_variable.hpp>
  10. #include <boost/thread/locks.hpp>
  11. #include <boost/thread/mutex.hpp>
  12. template <typename T>
  13. class CCheckQueueControl;
  14. /**
  15. * Queue for verifications that have to be performed.
  16. * The verifications are represented by a type T, which must provide an
  17. * operator(), returning a bool.
  18. *
  19. * One thread (the master) is assumed to push batches of verifications
  20. * onto the queue, where they are processed by N-1 worker threads. When
  21. * the master is done adding work, it temporarily joins the worker pool
  22. * as an N'th worker, until all jobs are done.
  23. */
  24. template <typename T>
  25. class CCheckQueue
  26. {
  27. private:
  28. //! Mutex to protect the inner state
  29. boost::mutex mutex;
  30. //! Worker threads block on this when out of work
  31. boost::condition_variable condWorker;
  32. //! Master thread blocks on this when out of work
  33. boost::condition_variable condMaster;
  34. //! The queue of elements to be processed.
  35. //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
  36. std::vector<T> queue;
  37. //! The number of workers (including the master) that are idle.
  38. int nIdle;
  39. //! The total number of workers (including the master).
  40. int nTotal;
  41. //! The temporary evaluation result.
  42. bool fAllOk;
  43. /**
  44. * Number of verifications that haven't completed yet.
  45. * This includes elements that are no longer queued, but still in the
  46. * worker's own batches.
  47. */
  48. unsigned int nTodo;
  49. //! Whether we're shutting down.
  50. bool fQuit;
  51. //! The maximum number of elements to be processed in one batch
  52. unsigned int nBatchSize;
  53. /** Internal function that does bulk of the verification work. */
  54. bool Loop(bool fMaster = false)
  55. {
  56. boost::condition_variable& cond = fMaster ? condMaster : condWorker;
  57. std::vector<T> vChecks;
  58. vChecks.reserve(nBatchSize);
  59. unsigned int nNow = 0;
  60. bool fOk = true;
  61. do {
  62. {
  63. boost::unique_lock<boost::mutex> lock(mutex);
  64. // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
  65. if (nNow) {
  66. fAllOk &= fOk;
  67. nTodo -= nNow;
  68. if (nTodo == 0 && !fMaster)
  69. // We processed the last element; inform the master it can exit and return the result
  70. condMaster.notify_one();
  71. } else {
  72. // first iteration
  73. nTotal++;
  74. }
  75. // logically, the do loop starts here
  76. while (queue.empty()) {
  77. if ((fMaster || fQuit) && nTodo == 0) {
  78. nTotal--;
  79. bool fRet = fAllOk;
  80. // reset the status for new work later
  81. if (fMaster)
  82. fAllOk = true;
  83. // return the current status
  84. return fRet;
  85. }
  86. nIdle++;
  87. cond.wait(lock); // wait
  88. nIdle--;
  89. }
  90. // Decide how many work units to process now.
  91. // * Do not try to do everything at once, but aim for increasingly smaller batches so
  92. // all workers finish approximately simultaneously.
  93. // * Try to account for idle jobs which will instantly start helping.
  94. // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
  95. nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
  96. vChecks.resize(nNow);
  97. for (unsigned int i = 0; i < nNow; i++) {
  98. // We want the lock on the mutex to be as short as possible, so swap jobs from the global
  99. // queue to the local batch vector instead of copying.
  100. vChecks[i].swap(queue.back());
  101. queue.pop_back();
  102. }
  103. // Check whether we need to do work at all
  104. fOk = fAllOk;
  105. }
  106. // execute work
  107. BOOST_FOREACH (T& check, vChecks)
  108. if (fOk)
  109. fOk = check();
  110. vChecks.clear();
  111. } while (true);
  112. }
  113. public:
  114. //! Create a new check queue
  115. CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
  116. //! Worker thread
  117. void Thread()
  118. {
  119. Loop();
  120. }
  121. //! Wait until execution finishes, and return whether all evaluations were successful.
  122. bool Wait()
  123. {
  124. return Loop(true);
  125. }
  126. //! Add a batch of checks to the queue
  127. void Add(std::vector<T>& vChecks)
  128. {
  129. boost::unique_lock<boost::mutex> lock(mutex);
  130. BOOST_FOREACH (T& check, vChecks) {
  131. queue.push_back(T());
  132. check.swap(queue.back());
  133. }
  134. nTodo += vChecks.size();
  135. if (vChecks.size() == 1)
  136. condWorker.notify_one();
  137. else if (vChecks.size() > 1)
  138. condWorker.notify_all();
  139. }
  140. ~CCheckQueue()
  141. {
  142. }
  143. bool IsIdle()
  144. {
  145. boost::unique_lock<boost::mutex> lock(mutex);
  146. return (nTotal == nIdle && nTodo == 0 && fAllOk == true);
  147. }
  148. };
  149. /**
  150. * RAII-style controller object for a CCheckQueue that guarantees the passed
  151. * queue is finished before continuing.
  152. */
  153. template <typename T>
  154. class CCheckQueueControl
  155. {
  156. private:
  157. CCheckQueue<T>* pqueue;
  158. bool fDone;
  159. public:
  160. CCheckQueueControl(CCheckQueue<T>* pqueueIn) : pqueue(pqueueIn), fDone(false)
  161. {
  162. // passed queue is supposed to be unused, or NULL
  163. if (pqueue != NULL) {
  164. bool isIdle = pqueue->IsIdle();
  165. assert(isIdle);
  166. }
  167. }
  168. bool Wait()
  169. {
  170. if (pqueue == NULL)
  171. return true;
  172. bool fRet = pqueue->Wait();
  173. fDone = true;
  174. return fRet;
  175. }
  176. void Add(std::vector<T>& vChecks)
  177. {
  178. if (pqueue != NULL)
  179. pqueue->Add(vChecks);
  180. }
  181. ~CCheckQueueControl()
  182. {
  183. if (!fDone)
  184. Wait();
  185. }
  186. };
  187. #endif // BITCOIN_CHECKQUEUE_H