123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603 |
- /*
- * Copyright 2011-2013 Blender Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "util/util_foreach.h"
- #include "util/util_logging.h"
- #include "util/util_system.h"
- #include "util/util_task.h"
- #include "util/util_time.h"
- //#define THREADING_DEBUG_ENABLED
- #ifdef THREADING_DEBUG_ENABLED
- # include <stdio.h>
- # define THREADING_DEBUG(...) \
- do { \
- printf(__VA_ARGS__); \
- fflush(stdout); \
- } while (0)
- #else
- # define THREADING_DEBUG(...)
- #endif
- CCL_NAMESPACE_BEGIN
- /* Task Pool */
- TaskPool::TaskPool()
- {
- num_tasks_handled = 0;
- num = 0;
- do_cancel = false;
- }
- TaskPool::~TaskPool()
- {
- stop();
- }
- void TaskPool::push(Task *task, bool front)
- {
- TaskScheduler::Entry entry;
- entry.task = task;
- entry.pool = this;
- TaskScheduler::push(entry, front);
- }
- void TaskPool::push(const TaskRunFunction &run, bool front)
- {
- push(new Task(run), front);
- }
- void TaskPool::wait_work(Summary *stats)
- {
- thread_scoped_lock num_lock(num_mutex);
- while (num != 0) {
- num_lock.unlock();
- thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
- /* find task from this pool. if we get a task from another pool,
- * we can get into deadlock */
- TaskScheduler::Entry work_entry;
- bool found_entry = false;
- list<TaskScheduler::Entry>::iterator it;
- for (it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
- TaskScheduler::Entry &entry = *it;
- if (entry.pool == this) {
- work_entry = entry;
- found_entry = true;
- TaskScheduler::queue.erase(it);
- break;
- }
- }
- queue_lock.unlock();
- /* if found task, do it, otherwise wait until other tasks are done */
- if (found_entry) {
- /* run task */
- work_entry.task->run(0);
- /* delete task */
- delete work_entry.task;
- /* notify pool task was done */
- num_decrease(1);
- }
- num_lock.lock();
- if (num == 0)
- break;
- if (!found_entry) {
- THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
- num_cond.wait(num_lock);
- THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
- }
- }
- if (stats != NULL) {
- stats->time_total = time_dt() - start_time;
- stats->num_tasks_handled = num_tasks_handled;
- }
- }
- void TaskPool::cancel()
- {
- do_cancel = true;
- TaskScheduler::clear(this);
- {
- thread_scoped_lock num_lock(num_mutex);
- while (num) {
- THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
- num_cond.wait(num_lock);
- THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
- }
- }
- do_cancel = false;
- }
- void TaskPool::stop()
- {
- TaskScheduler::clear(this);
- assert(num == 0);
- }
- bool TaskPool::canceled()
- {
- return do_cancel;
- }
- bool TaskPool::finished()
- {
- thread_scoped_lock num_lock(num_mutex);
- return num == 0;
- }
- void TaskPool::num_decrease(int done)
- {
- num_mutex.lock();
- num -= done;
- assert(num >= 0);
- if (num == 0) {
- THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
- num_cond.notify_all();
- }
- num_mutex.unlock();
- }
- void TaskPool::num_increase()
- {
- thread_scoped_lock num_lock(num_mutex);
- if (num_tasks_handled == 0) {
- start_time = time_dt();
- }
- num++;
- num_tasks_handled++;
- THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
- num_cond.notify_all();
- }
- /* Task Scheduler */
- thread_mutex TaskScheduler::mutex;
- int TaskScheduler::users = 0;
- vector<thread *> TaskScheduler::threads;
- bool TaskScheduler::do_exit = false;
- list<TaskScheduler::Entry> TaskScheduler::queue;
- thread_mutex TaskScheduler::queue_mutex;
- thread_condition_variable TaskScheduler::queue_cond;
- namespace {
- /* Get number of processors on each of the available nodes. The result is sized
- * by the highest node index, and element corresponds to number of processors on
- * that node.
- * If node is not available, then the corresponding number of processors is
- * zero. */
- void get_per_node_num_processors(vector<int> *num_per_node_processors)
- {
- const int num_nodes = system_cpu_num_numa_nodes();
- if (num_nodes == 0) {
- LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen.";
- return;
- }
- num_per_node_processors->resize(num_nodes);
- for (int node = 0; node < num_nodes; ++node) {
- if (!system_cpu_is_numa_node_available(node)) {
- (*num_per_node_processors)[node] = 0;
- continue;
- }
- (*num_per_node_processors)[node] = system_cpu_num_numa_node_processors(node);
- }
- }
- /* Calculate total number of processors on all available nodes.
- * This is similar to system_cpu_thread_count(), but uses pre-calculated number
- * of processors on each of the node, avoiding extra system calls and checks for
- * the node availability. */
- int get_num_total_processors(const vector<int> &num_per_node_processors)
- {
- int num_total_processors = 0;
- foreach (int num_node_processors, num_per_node_processors) {
- num_total_processors += num_node_processors;
- }
- return num_total_processors;
- }
- /* Compute NUMA node for every thread to run on, for the best performance. */
- vector<int> distribute_threads_on_nodes(const int num_threads)
- {
- /* Start with all threads unassigned to any specific NUMA node. */
- vector<int> thread_nodes(num_threads, -1);
- const int num_active_group_processors = system_cpu_num_active_group_processors();
- VLOG(1) << "Detected " << num_active_group_processors << " processors "
- << "in active group.";
- if (num_active_group_processors >= num_threads) {
- /* If the current thread is set up in a way that its affinity allows to
- * use at least requested number of threads we do not explicitly set
- * affinity to the worker threads.
- * This way we allow users to manually edit affinity of the parent
- * thread, and here we follow that affinity. This way it's possible to
- * have two Cycles/Blender instances running manually set to a different
- * dies on a CPU. */
- VLOG(1) << "Not setting thread group affinity.";
- return thread_nodes;
- }
- vector<int> num_per_node_processors;
- get_per_node_num_processors(&num_per_node_processors);
- if (num_per_node_processors.size() == 0) {
- /* Error was already reported, here we can't do anything, so we simply
- * leave default affinity to all the worker threads. */
- return thread_nodes;
- }
- const int num_nodes = num_per_node_processors.size();
- int thread_index = 0;
- /* First pass: fill in all the nodes to their maximum.
- *
- * If there is less threads than the overall nodes capacity, some of the
- * nodes or parts of them will idle.
- *
- * TODO(sergey): Consider picking up fastest nodes if number of threads
- * fits on them. For example, on Threadripper2 we might consider using nodes
- * 0 and 2 if user requested 32 render threads. */
- const int num_total_node_processors = get_num_total_processors(num_per_node_processors);
- int current_node_index = 0;
- while (thread_index < num_total_node_processors && thread_index < num_threads) {
- const int num_node_processors = num_per_node_processors[current_node_index];
- for (int processor_index = 0; processor_index < num_node_processors; ++processor_index) {
- VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
- thread_nodes[thread_index] = current_node_index;
- ++thread_index;
- if (thread_index == num_threads) {
- /* All threads are scheduled on their nodes. */
- return thread_nodes;
- }
- }
- ++current_node_index;
- }
- /* Second pass: keep scheduling threads to each node one by one, uniformly
- * fillign them in.
- * This is where things becomes tricky to predict for the maximum
- * performance: on the one hand this avoids too much threading overhead on
- * few nodes, but for the final performance having all the overhead on one
- * node might be better idea (since other nodes will have better chance of
- * rendering faster).
- * But more tricky is that nodes might have difference capacity, so we might
- * want to do some weighted scheduling. For example, if node 0 has 16
- * processors and node 1 has 32 processors, we'd better schedule 1 extra
- * thread on node 0 and 2 extra threads on node 1. */
- current_node_index = 0;
- while (thread_index < num_threads) {
- /* Skip unavailable nodes. */
- /* TODO(sergey): Add sanity check against deadlock. */
- while (num_per_node_processors[current_node_index] == 0) {
- current_node_index = (current_node_index + 1) % num_nodes;
- }
- VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
- ++thread_index;
- current_node_index = (current_node_index + 1) % num_nodes;
- }
- return thread_nodes;
- }
- } // namespace
- void TaskScheduler::init(int num_threads)
- {
- thread_scoped_lock lock(mutex);
- /* Multiple cycles instances can use this task scheduler, sharing the same
- * threads, so we keep track of the number of users. */
- ++users;
- if (users != 1) {
- return;
- }
- do_exit = false;
- const bool use_auto_threads = (num_threads == 0);
- if (use_auto_threads) {
- /* Automatic number of threads. */
- num_threads = system_cpu_thread_count();
- }
- VLOG(1) << "Creating pool of " << num_threads << " threads.";
- /* Compute distribution on NUMA nodes. */
- vector<int> thread_nodes = distribute_threads_on_nodes(num_threads);
- /* Launch threads that will be waiting for work. */
- threads.resize(num_threads);
- for (int thread_index = 0; thread_index < num_threads; ++thread_index) {
- threads[thread_index] = new thread(function_bind(&TaskScheduler::thread_run, thread_index + 1),
- thread_nodes[thread_index]);
- }
- }
- void TaskScheduler::exit()
- {
- thread_scoped_lock lock(mutex);
- users--;
- if (users == 0) {
- VLOG(1) << "De-initializing thread pool of task scheduler.";
- /* stop all waiting threads */
- TaskScheduler::queue_mutex.lock();
- do_exit = true;
- TaskScheduler::queue_cond.notify_all();
- TaskScheduler::queue_mutex.unlock();
- /* delete threads */
- foreach (thread *t, threads) {
- t->join();
- delete t;
- }
- threads.clear();
- }
- }
- void TaskScheduler::free_memory()
- {
- assert(users == 0);
- threads.free_memory();
- }
- bool TaskScheduler::thread_wait_pop(Entry &entry)
- {
- thread_scoped_lock queue_lock(queue_mutex);
- while (queue.empty() && !do_exit)
- queue_cond.wait(queue_lock);
- if (queue.empty()) {
- assert(do_exit);
- return false;
- }
- entry = queue.front();
- queue.pop_front();
- return true;
- }
- void TaskScheduler::thread_run(int thread_id)
- {
- Entry entry;
- /* todo: test affinity/denormal mask */
- /* keep popping off tasks */
- while (thread_wait_pop(entry)) {
- /* run task */
- entry.task->run(thread_id);
- /* delete task */
- delete entry.task;
- /* notify pool task was done */
- entry.pool->num_decrease(1);
- }
- }
- void TaskScheduler::push(Entry &entry, bool front)
- {
- entry.pool->num_increase();
- /* add entry to queue */
- TaskScheduler::queue_mutex.lock();
- if (front)
- TaskScheduler::queue.push_front(entry);
- else
- TaskScheduler::queue.push_back(entry);
- TaskScheduler::queue_cond.notify_one();
- TaskScheduler::queue_mutex.unlock();
- }
- void TaskScheduler::clear(TaskPool *pool)
- {
- thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
- /* erase all tasks from this pool from the queue */
- list<Entry>::iterator it = queue.begin();
- int done = 0;
- while (it != queue.end()) {
- Entry &entry = *it;
- if (entry.pool == pool) {
- done++;
- delete entry.task;
- it = queue.erase(it);
- }
- else
- it++;
- }
- queue_lock.unlock();
- /* notify done */
- pool->num_decrease(done);
- }
- /* Dedicated Task Pool */
- DedicatedTaskPool::DedicatedTaskPool()
- {
- do_cancel = false;
- do_exit = false;
- num = 0;
- worker_thread = new thread(function_bind(&DedicatedTaskPool::thread_run, this));
- }
- DedicatedTaskPool::~DedicatedTaskPool()
- {
- stop();
- worker_thread->join();
- delete worker_thread;
- }
- void DedicatedTaskPool::push(Task *task, bool front)
- {
- num_increase();
- /* add task to queue */
- queue_mutex.lock();
- if (front)
- queue.push_front(task);
- else
- queue.push_back(task);
- queue_cond.notify_one();
- queue_mutex.unlock();
- }
- void DedicatedTaskPool::push(const TaskRunFunction &run, bool front)
- {
- push(new Task(run), front);
- }
- void DedicatedTaskPool::wait()
- {
- thread_scoped_lock num_lock(num_mutex);
- while (num)
- num_cond.wait(num_lock);
- }
- void DedicatedTaskPool::cancel()
- {
- do_cancel = true;
- clear();
- wait();
- do_cancel = false;
- }
- void DedicatedTaskPool::stop()
- {
- clear();
- do_exit = true;
- queue_cond.notify_all();
- wait();
- assert(num == 0);
- }
- bool DedicatedTaskPool::canceled()
- {
- return do_cancel;
- }
- void DedicatedTaskPool::num_decrease(int done)
- {
- thread_scoped_lock num_lock(num_mutex);
- num -= done;
- assert(num >= 0);
- if (num == 0)
- num_cond.notify_all();
- }
- void DedicatedTaskPool::num_increase()
- {
- thread_scoped_lock num_lock(num_mutex);
- num++;
- num_cond.notify_all();
- }
- bool DedicatedTaskPool::thread_wait_pop(Task *&task)
- {
- thread_scoped_lock queue_lock(queue_mutex);
- while (queue.empty() && !do_exit)
- queue_cond.wait(queue_lock);
- if (queue.empty()) {
- assert(do_exit);
- return false;
- }
- task = queue.front();
- queue.pop_front();
- return true;
- }
- void DedicatedTaskPool::thread_run()
- {
- Task *task;
- /* keep popping off tasks */
- while (thread_wait_pop(task)) {
- /* run task */
- task->run(0);
- /* delete task */
- delete task;
- /* notify task was done */
- num_decrease(1);
- }
- }
- void DedicatedTaskPool::clear()
- {
- thread_scoped_lock queue_lock(queue_mutex);
- /* erase all tasks from the queue */
- list<Task *>::iterator it = queue.begin();
- int done = 0;
- while (it != queue.end()) {
- done++;
- delete *it;
- it = queue.erase(it);
- }
- queue_lock.unlock();
- /* notify done */
- num_decrease(done);
- }
- string TaskPool::Summary::full_report() const
- {
- string report = "";
- report += string_printf("Total time: %f\n", time_total);
- report += string_printf("Tasks handled: %d\n", num_tasks_handled);
- return report;
- }
- CCL_NAMESPACE_END
|