scheduler.cpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Copyright (c) 2015 The Bitcoin Core developers
  2. // Distributed under the MIT software license, see the accompanying
  3. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4. #include "scheduler.h"
  5. #include "reverselock.h"
  6. #include <assert.h>
  7. #include <boost/bind.hpp>
  8. #include <utility>
  9. CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false)
  10. {
  11. }
  12. CScheduler::~CScheduler()
  13. {
  14. assert(nThreadsServicingQueue == 0);
  15. }
  16. #if BOOST_VERSION < 105000
  17. static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
  18. {
  19. return boost::posix_time::from_time_t(boost::chrono::system_clock::to_time_t(t));
  20. }
  21. #endif
  22. void CScheduler::serviceQueue()
  23. {
  24. boost::unique_lock<boost::mutex> lock(newTaskMutex);
  25. ++nThreadsServicingQueue;
  26. // newTaskMutex is locked throughout this loop EXCEPT
  27. // when the thread is waiting or when the user's function
  28. // is called.
  29. while (!shouldStop()) {
  30. try {
  31. while (!shouldStop() && taskQueue.empty()) {
  32. // Wait until there is something to do.
  33. newTaskScheduled.wait(lock);
  34. }
  35. // Wait until either there is a new task, or until
  36. // the time of the first item on the queue:
  37. // wait_until needs boost 1.50 or later; older versions have timed_wait:
  38. #if BOOST_VERSION < 105000
  39. while (!shouldStop() && !taskQueue.empty() &&
  40. newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
  41. // Keep waiting until timeout
  42. }
  43. #else
  44. // Some boost versions have a conflicting overload of wait_until that returns void.
  45. // Explicitly use a template here to avoid hitting that overload.
  46. while (!shouldStop() && !taskQueue.empty() &&
  47. newTaskScheduled.wait_until<>(lock, taskQueue.begin()->first) != boost::cv_status::timeout) {
  48. // Keep waiting until timeout
  49. }
  50. #endif
  51. // If there are multiple threads, the queue can empty while we're waiting (another
  52. // thread may service the task we were waiting on).
  53. if (shouldStop() || taskQueue.empty())
  54. continue;
  55. Function f = taskQueue.begin()->second;
  56. taskQueue.erase(taskQueue.begin());
  57. {
  58. // Unlock before calling f, so it can reschedule itself or another task
  59. // without deadlocking:
  60. reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
  61. f();
  62. }
  63. } catch (...) {
  64. --nThreadsServicingQueue;
  65. throw;
  66. }
  67. }
  68. --nThreadsServicingQueue;
  69. }
  70. void CScheduler::stop(bool drain)
  71. {
  72. {
  73. boost::unique_lock<boost::mutex> lock(newTaskMutex);
  74. if (drain)
  75. stopWhenEmpty = true;
  76. else
  77. stopRequested = true;
  78. }
  79. newTaskScheduled.notify_all();
  80. }
  81. void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
  82. {
  83. {
  84. boost::unique_lock<boost::mutex> lock(newTaskMutex);
  85. taskQueue.insert(std::make_pair(t, f));
  86. }
  87. newTaskScheduled.notify_one();
  88. }
  89. void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaSeconds)
  90. {
  91. schedule(f, boost::chrono::system_clock::now() + boost::chrono::seconds(deltaSeconds));
  92. }
  93. static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaSeconds)
  94. {
  95. f();
  96. s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaSeconds), deltaSeconds);
  97. }
  98. void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds)
  99. {
  100. scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds);
  101. }
  102. size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
  103. boost::chrono::system_clock::time_point &last) const
  104. {
  105. boost::unique_lock<boost::mutex> lock(newTaskMutex);
  106. size_t result = taskQueue.size();
  107. if (!taskQueue.empty()) {
  108. first = taskQueue.begin()->first;
  109. last = taskQueue.rbegin()->first;
  110. }
  111. return result;
  112. }