123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603 |
- #include "util/util_foreach.h"
- #include "util/util_logging.h"
- #include "util/util_system.h"
- #include "util/util_task.h"
- #include "util/util_time.h"
- #ifdef THREADING_DEBUG_ENABLED
- # include <stdio.h>
- # define THREADING_DEBUG(...) \
- do { \
- printf(__VA_ARGS__); \
- fflush(stdout); \
- } while (0)
- #else
- # define THREADING_DEBUG(...)
- #endif
- CCL_NAMESPACE_BEGIN
- TaskPool::TaskPool()
- {
- num_tasks_handled = 0;
- num = 0;
- do_cancel = false;
- }
- TaskPool::~TaskPool()
- {
- stop();
- }
- void TaskPool::push(Task *task, bool front)
- {
- TaskScheduler::Entry entry;
- entry.task = task;
- entry.pool = this;
- TaskScheduler::push(entry, front);
- }
- void TaskPool::push(const TaskRunFunction &run, bool front)
- {
- push(new Task(run), front);
- }
- void TaskPool::wait_work(Summary *stats)
- {
- thread_scoped_lock num_lock(num_mutex);
- while (num != 0) {
- num_lock.unlock();
- thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
-
- TaskScheduler::Entry work_entry;
- bool found_entry = false;
- list<TaskScheduler::Entry>::iterator it;
- for (it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
- TaskScheduler::Entry &entry = *it;
- if (entry.pool == this) {
- work_entry = entry;
- found_entry = true;
- TaskScheduler::queue.erase(it);
- break;
- }
- }
- queue_lock.unlock();
-
- if (found_entry) {
-
- work_entry.task->run(0);
-
- delete work_entry.task;
-
- num_decrease(1);
- }
- num_lock.lock();
- if (num == 0)
- break;
- if (!found_entry) {
- THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
- num_cond.wait(num_lock);
- THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
- }
- }
- if (stats != NULL) {
- stats->time_total = time_dt() - start_time;
- stats->num_tasks_handled = num_tasks_handled;
- }
- }
- void TaskPool::cancel()
- {
- do_cancel = true;
- TaskScheduler::clear(this);
- {
- thread_scoped_lock num_lock(num_mutex);
- while (num) {
- THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
- num_cond.wait(num_lock);
- THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
- }
- }
- do_cancel = false;
- }
- void TaskPool::stop()
- {
- TaskScheduler::clear(this);
- assert(num == 0);
- }
- bool TaskPool::canceled()
- {
- return do_cancel;
- }
- bool TaskPool::finished()
- {
- thread_scoped_lock num_lock(num_mutex);
- return num == 0;
- }
- void TaskPool::num_decrease(int done)
- {
- num_mutex.lock();
- num -= done;
- assert(num >= 0);
- if (num == 0) {
- THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
- num_cond.notify_all();
- }
- num_mutex.unlock();
- }
- void TaskPool::num_increase()
- {
- thread_scoped_lock num_lock(num_mutex);
- if (num_tasks_handled == 0) {
- start_time = time_dt();
- }
- num++;
- num_tasks_handled++;
- THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
- num_cond.notify_all();
- }
- thread_mutex TaskScheduler::mutex;
- int TaskScheduler::users = 0;
- vector<thread *> TaskScheduler::threads;
- bool TaskScheduler::do_exit = false;
- list<TaskScheduler::Entry> TaskScheduler::queue;
- thread_mutex TaskScheduler::queue_mutex;
- thread_condition_variable TaskScheduler::queue_cond;
- namespace {
- void get_per_node_num_processors(vector<int> *num_per_node_processors)
- {
- const int num_nodes = system_cpu_num_numa_nodes();
- if (num_nodes == 0) {
- LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen.";
- return;
- }
- num_per_node_processors->resize(num_nodes);
- for (int node = 0; node < num_nodes; ++node) {
- if (!system_cpu_is_numa_node_available(node)) {
- (*num_per_node_processors)[node] = 0;
- continue;
- }
- (*num_per_node_processors)[node] = system_cpu_num_numa_node_processors(node);
- }
- }
- int get_num_total_processors(const vector<int> &num_per_node_processors)
- {
- int num_total_processors = 0;
- foreach (int num_node_processors, num_per_node_processors) {
- num_total_processors += num_node_processors;
- }
- return num_total_processors;
- }
- vector<int> distribute_threads_on_nodes(const int num_threads)
- {
-
- vector<int> thread_nodes(num_threads, -1);
- const int num_active_group_processors = system_cpu_num_active_group_processors();
- VLOG(1) << "Detected " << num_active_group_processors << " processors "
- << "in active group.";
- if (num_active_group_processors >= num_threads) {
-
- VLOG(1) << "Not setting thread group affinity.";
- return thread_nodes;
- }
- vector<int> num_per_node_processors;
- get_per_node_num_processors(&num_per_node_processors);
- if (num_per_node_processors.size() == 0) {
-
- return thread_nodes;
- }
- const int num_nodes = num_per_node_processors.size();
- int thread_index = 0;
-
- const int num_total_node_processors = get_num_total_processors(num_per_node_processors);
- int current_node_index = 0;
- while (thread_index < num_total_node_processors && thread_index < num_threads) {
- const int num_node_processors = num_per_node_processors[current_node_index];
- for (int processor_index = 0; processor_index < num_node_processors; ++processor_index) {
- VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
- thread_nodes[thread_index] = current_node_index;
- ++thread_index;
- if (thread_index == num_threads) {
-
- return thread_nodes;
- }
- }
- ++current_node_index;
- }
-
- current_node_index = 0;
- while (thread_index < num_threads) {
-
-
- while (num_per_node_processors[current_node_index] == 0) {
- current_node_index = (current_node_index + 1) % num_nodes;
- }
- VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
- ++thread_index;
- current_node_index = (current_node_index + 1) % num_nodes;
- }
- return thread_nodes;
- }
- }
- void TaskScheduler::init(int num_threads)
- {
- thread_scoped_lock lock(mutex);
-
- ++users;
- if (users != 1) {
- return;
- }
- do_exit = false;
- const bool use_auto_threads = (num_threads == 0);
- if (use_auto_threads) {
-
- num_threads = system_cpu_thread_count();
- }
- VLOG(1) << "Creating pool of " << num_threads << " threads.";
-
- vector<int> thread_nodes = distribute_threads_on_nodes(num_threads);
-
- threads.resize(num_threads);
- for (int thread_index = 0; thread_index < num_threads; ++thread_index) {
- threads[thread_index] = new thread(function_bind(&TaskScheduler::thread_run, thread_index + 1),
- thread_nodes[thread_index]);
- }
- }
- void TaskScheduler::exit()
- {
- thread_scoped_lock lock(mutex);
- users--;
- if (users == 0) {
- VLOG(1) << "De-initializing thread pool of task scheduler.";
-
- TaskScheduler::queue_mutex.lock();
- do_exit = true;
- TaskScheduler::queue_cond.notify_all();
- TaskScheduler::queue_mutex.unlock();
-
- foreach (thread *t, threads) {
- t->join();
- delete t;
- }
- threads.clear();
- }
- }
- void TaskScheduler::free_memory()
- {
- assert(users == 0);
- threads.free_memory();
- }
- bool TaskScheduler::thread_wait_pop(Entry &entry)
- {
- thread_scoped_lock queue_lock(queue_mutex);
- while (queue.empty() && !do_exit)
- queue_cond.wait(queue_lock);
- if (queue.empty()) {
- assert(do_exit);
- return false;
- }
- entry = queue.front();
- queue.pop_front();
- return true;
- }
- void TaskScheduler::thread_run(int thread_id)
- {
- Entry entry;
-
-
- while (thread_wait_pop(entry)) {
-
- entry.task->run(thread_id);
-
- delete entry.task;
-
- entry.pool->num_decrease(1);
- }
- }
- void TaskScheduler::push(Entry &entry, bool front)
- {
- entry.pool->num_increase();
-
- TaskScheduler::queue_mutex.lock();
- if (front)
- TaskScheduler::queue.push_front(entry);
- else
- TaskScheduler::queue.push_back(entry);
- TaskScheduler::queue_cond.notify_one();
- TaskScheduler::queue_mutex.unlock();
- }
- void TaskScheduler::clear(TaskPool *pool)
- {
- thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
-
- list<Entry>::iterator it = queue.begin();
- int done = 0;
- while (it != queue.end()) {
- Entry &entry = *it;
- if (entry.pool == pool) {
- done++;
- delete entry.task;
- it = queue.erase(it);
- }
- else
- it++;
- }
- queue_lock.unlock();
-
- pool->num_decrease(done);
- }
- DedicatedTaskPool::DedicatedTaskPool()
- {
- do_cancel = false;
- do_exit = false;
- num = 0;
- worker_thread = new thread(function_bind(&DedicatedTaskPool::thread_run, this));
- }
- DedicatedTaskPool::~DedicatedTaskPool()
- {
- stop();
- worker_thread->join();
- delete worker_thread;
- }
- void DedicatedTaskPool::push(Task *task, bool front)
- {
- num_increase();
-
- queue_mutex.lock();
- if (front)
- queue.push_front(task);
- else
- queue.push_back(task);
- queue_cond.notify_one();
- queue_mutex.unlock();
- }
- void DedicatedTaskPool::push(const TaskRunFunction &run, bool front)
- {
- push(new Task(run), front);
- }
- void DedicatedTaskPool::wait()
- {
- thread_scoped_lock num_lock(num_mutex);
- while (num)
- num_cond.wait(num_lock);
- }
- void DedicatedTaskPool::cancel()
- {
- do_cancel = true;
- clear();
- wait();
- do_cancel = false;
- }
- void DedicatedTaskPool::stop()
- {
- clear();
- do_exit = true;
- queue_cond.notify_all();
- wait();
- assert(num == 0);
- }
- bool DedicatedTaskPool::canceled()
- {
- return do_cancel;
- }
- void DedicatedTaskPool::num_decrease(int done)
- {
- thread_scoped_lock num_lock(num_mutex);
- num -= done;
- assert(num >= 0);
- if (num == 0)
- num_cond.notify_all();
- }
- void DedicatedTaskPool::num_increase()
- {
- thread_scoped_lock num_lock(num_mutex);
- num++;
- num_cond.notify_all();
- }
- bool DedicatedTaskPool::thread_wait_pop(Task *&task)
- {
- thread_scoped_lock queue_lock(queue_mutex);
- while (queue.empty() && !do_exit)
- queue_cond.wait(queue_lock);
- if (queue.empty()) {
- assert(do_exit);
- return false;
- }
- task = queue.front();
- queue.pop_front();
- return true;
- }
- void DedicatedTaskPool::thread_run()
- {
- Task *task;
-
- while (thread_wait_pop(task)) {
-
- task->run(0);
-
- delete task;
-
- num_decrease(1);
- }
- }
- void DedicatedTaskPool::clear()
- {
- thread_scoped_lock queue_lock(queue_mutex);
-
- list<Task *>::iterator it = queue.begin();
- int done = 0;
- while (it != queue.end()) {
- done++;
- delete *it;
- it = queue.erase(it);
- }
- queue_lock.unlock();
-
- num_decrease(done);
- }
- string TaskPool::Summary::full_report() const
- {
- string report = "";
- report += string_printf("Total time: %f\n", time_total);
- report += string_printf("Tasks handled: %d\n", num_tasks_handled);
- return report;
- }
- CCL_NAMESPACE_END
|