util_task.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. /*
  2. * Copyright 2011-2013 Blender Foundation
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "util/util_foreach.h"
  17. #include "util/util_logging.h"
  18. #include "util/util_system.h"
  19. #include "util/util_task.h"
  20. #include "util/util_time.h"
  21. //#define THREADING_DEBUG_ENABLED
  22. #ifdef THREADING_DEBUG_ENABLED
  23. # include <stdio.h>
  24. # define THREADING_DEBUG(...) \
  25. do { \
  26. printf(__VA_ARGS__); \
  27. fflush(stdout); \
  28. } while (0)
  29. #else
  30. # define THREADING_DEBUG(...)
  31. #endif
  32. CCL_NAMESPACE_BEGIN
  33. /* Task Pool */
  34. TaskPool::TaskPool()
  35. {
  36. num_tasks_handled = 0;
  37. num = 0;
  38. do_cancel = false;
  39. }
  40. TaskPool::~TaskPool()
  41. {
  42. stop();
  43. }
  44. void TaskPool::push(Task *task, bool front)
  45. {
  46. TaskScheduler::Entry entry;
  47. entry.task = task;
  48. entry.pool = this;
  49. TaskScheduler::push(entry, front);
  50. }
  51. void TaskPool::push(const TaskRunFunction &run, bool front)
  52. {
  53. push(new Task(run), front);
  54. }
  55. void TaskPool::wait_work(Summary *stats)
  56. {
  57. thread_scoped_lock num_lock(num_mutex);
  58. while (num != 0) {
  59. num_lock.unlock();
  60. thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
  61. /* find task from this pool. if we get a task from another pool,
  62. * we can get into deadlock */
  63. TaskScheduler::Entry work_entry;
  64. bool found_entry = false;
  65. list<TaskScheduler::Entry>::iterator it;
  66. for (it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
  67. TaskScheduler::Entry &entry = *it;
  68. if (entry.pool == this) {
  69. work_entry = entry;
  70. found_entry = true;
  71. TaskScheduler::queue.erase(it);
  72. break;
  73. }
  74. }
  75. queue_lock.unlock();
  76. /* if found task, do it, otherwise wait until other tasks are done */
  77. if (found_entry) {
  78. /* run task */
  79. work_entry.task->run(0);
  80. /* delete task */
  81. delete work_entry.task;
  82. /* notify pool task was done */
  83. num_decrease(1);
  84. }
  85. num_lock.lock();
  86. if (num == 0)
  87. break;
  88. if (!found_entry) {
  89. THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
  90. num_cond.wait(num_lock);
  91. THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
  92. }
  93. }
  94. if (stats != NULL) {
  95. stats->time_total = time_dt() - start_time;
  96. stats->num_tasks_handled = num_tasks_handled;
  97. }
  98. }
  99. void TaskPool::cancel()
  100. {
  101. do_cancel = true;
  102. TaskScheduler::clear(this);
  103. {
  104. thread_scoped_lock num_lock(num_mutex);
  105. while (num) {
  106. THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
  107. num_cond.wait(num_lock);
  108. THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
  109. }
  110. }
  111. do_cancel = false;
  112. }
  113. void TaskPool::stop()
  114. {
  115. TaskScheduler::clear(this);
  116. assert(num == 0);
  117. }
  118. bool TaskPool::canceled()
  119. {
  120. return do_cancel;
  121. }
  122. bool TaskPool::finished()
  123. {
  124. thread_scoped_lock num_lock(num_mutex);
  125. return num == 0;
  126. }
  127. void TaskPool::num_decrease(int done)
  128. {
  129. num_mutex.lock();
  130. num -= done;
  131. assert(num >= 0);
  132. if (num == 0) {
  133. THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
  134. num_cond.notify_all();
  135. }
  136. num_mutex.unlock();
  137. }
  138. void TaskPool::num_increase()
  139. {
  140. thread_scoped_lock num_lock(num_mutex);
  141. if (num_tasks_handled == 0) {
  142. start_time = time_dt();
  143. }
  144. num++;
  145. num_tasks_handled++;
  146. THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
  147. num_cond.notify_all();
  148. }
  149. /* Task Scheduler */
  150. thread_mutex TaskScheduler::mutex;
  151. int TaskScheduler::users = 0;
  152. vector<thread *> TaskScheduler::threads;
  153. bool TaskScheduler::do_exit = false;
  154. list<TaskScheduler::Entry> TaskScheduler::queue;
  155. thread_mutex TaskScheduler::queue_mutex;
  156. thread_condition_variable TaskScheduler::queue_cond;
  157. namespace {
  158. /* Get number of processors on each of the available nodes. The result is sized
  159. * by the highest node index, and element corresponds to number of processors on
  160. * that node.
  161. * If node is not available, then the corresponding number of processors is
  162. * zero. */
  163. void get_per_node_num_processors(vector<int> *num_per_node_processors)
  164. {
  165. const int num_nodes = system_cpu_num_numa_nodes();
  166. if (num_nodes == 0) {
  167. LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen.";
  168. return;
  169. }
  170. num_per_node_processors->resize(num_nodes);
  171. for (int node = 0; node < num_nodes; ++node) {
  172. if (!system_cpu_is_numa_node_available(node)) {
  173. (*num_per_node_processors)[node] = 0;
  174. continue;
  175. }
  176. (*num_per_node_processors)[node] = system_cpu_num_numa_node_processors(node);
  177. }
  178. }
  179. /* Calculate total number of processors on all available nodes.
  180. * This is similar to system_cpu_thread_count(), but uses pre-calculated number
  181. * of processors on each of the node, avoiding extra system calls and checks for
  182. * the node availability. */
  183. int get_num_total_processors(const vector<int> &num_per_node_processors)
  184. {
  185. int num_total_processors = 0;
  186. foreach (int num_node_processors, num_per_node_processors) {
  187. num_total_processors += num_node_processors;
  188. }
  189. return num_total_processors;
  190. }
  191. /* Compute NUMA node for every thread to run on, for the best performance. */
  192. vector<int> distribute_threads_on_nodes(const int num_threads)
  193. {
  194. /* Start with all threads unassigned to any specific NUMA node. */
  195. vector<int> thread_nodes(num_threads, -1);
  196. const int num_active_group_processors = system_cpu_num_active_group_processors();
  197. VLOG(1) << "Detected " << num_active_group_processors << " processors "
  198. << "in active group.";
  199. if (num_active_group_processors >= num_threads) {
  200. /* If the current thread is set up in a way that its affinity allows to
  201. * use at least requested number of threads we do not explicitly set
  202. * affinity to the worker threads.
  203. * This way we allow users to manually edit affinity of the parent
  204. * thread, and here we follow that affinity. This way it's possible to
  205. * have two Cycles/Blender instances running manually set to a different
  206. * dies on a CPU. */
  207. VLOG(1) << "Not setting thread group affinity.";
  208. return thread_nodes;
  209. }
  210. vector<int> num_per_node_processors;
  211. get_per_node_num_processors(&num_per_node_processors);
  212. if (num_per_node_processors.size() == 0) {
  213. /* Error was already reported, here we can't do anything, so we simply
  214. * leave default affinity to all the worker threads. */
  215. return thread_nodes;
  216. }
  217. const int num_nodes = num_per_node_processors.size();
  218. int thread_index = 0;
  219. /* First pass: fill in all the nodes to their maximum.
  220. *
  221. * If there is less threads than the overall nodes capacity, some of the
  222. * nodes or parts of them will idle.
  223. *
  224. * TODO(sergey): Consider picking up fastest nodes if number of threads
  225. * fits on them. For example, on Threadripper2 we might consider using nodes
  226. * 0 and 2 if user requested 32 render threads. */
  227. const int num_total_node_processors = get_num_total_processors(num_per_node_processors);
  228. int current_node_index = 0;
  229. while (thread_index < num_total_node_processors && thread_index < num_threads) {
  230. const int num_node_processors = num_per_node_processors[current_node_index];
  231. for (int processor_index = 0; processor_index < num_node_processors; ++processor_index) {
  232. VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
  233. thread_nodes[thread_index] = current_node_index;
  234. ++thread_index;
  235. if (thread_index == num_threads) {
  236. /* All threads are scheduled on their nodes. */
  237. return thread_nodes;
  238. }
  239. }
  240. ++current_node_index;
  241. }
  242. /* Second pass: keep scheduling threads to each node one by one, uniformly
  243. * fillign them in.
  244. * This is where things becomes tricky to predict for the maximum
  245. * performance: on the one hand this avoids too much threading overhead on
  246. * few nodes, but for the final performance having all the overhead on one
  247. * node might be better idea (since other nodes will have better chance of
  248. * rendering faster).
  249. * But more tricky is that nodes might have difference capacity, so we might
  250. * want to do some weighted scheduling. For example, if node 0 has 16
  251. * processors and node 1 has 32 processors, we'd better schedule 1 extra
  252. * thread on node 0 and 2 extra threads on node 1. */
  253. current_node_index = 0;
  254. while (thread_index < num_threads) {
  255. /* Skip unavailable nodes. */
  256. /* TODO(sergey): Add sanity check against deadlock. */
  257. while (num_per_node_processors[current_node_index] == 0) {
  258. current_node_index = (current_node_index + 1) % num_nodes;
  259. }
  260. VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
  261. ++thread_index;
  262. current_node_index = (current_node_index + 1) % num_nodes;
  263. }
  264. return thread_nodes;
  265. }
  266. } // namespace
  267. void TaskScheduler::init(int num_threads)
  268. {
  269. thread_scoped_lock lock(mutex);
  270. /* Multiple cycles instances can use this task scheduler, sharing the same
  271. * threads, so we keep track of the number of users. */
  272. ++users;
  273. if (users != 1) {
  274. return;
  275. }
  276. do_exit = false;
  277. const bool use_auto_threads = (num_threads == 0);
  278. if (use_auto_threads) {
  279. /* Automatic number of threads. */
  280. num_threads = system_cpu_thread_count();
  281. }
  282. VLOG(1) << "Creating pool of " << num_threads << " threads.";
  283. /* Compute distribution on NUMA nodes. */
  284. vector<int> thread_nodes = distribute_threads_on_nodes(num_threads);
  285. /* Launch threads that will be waiting for work. */
  286. threads.resize(num_threads);
  287. for (int thread_index = 0; thread_index < num_threads; ++thread_index) {
  288. threads[thread_index] = new thread(function_bind(&TaskScheduler::thread_run, thread_index + 1),
  289. thread_nodes[thread_index]);
  290. }
  291. }
  292. void TaskScheduler::exit()
  293. {
  294. thread_scoped_lock lock(mutex);
  295. users--;
  296. if (users == 0) {
  297. VLOG(1) << "De-initializing thread pool of task scheduler.";
  298. /* stop all waiting threads */
  299. TaskScheduler::queue_mutex.lock();
  300. do_exit = true;
  301. TaskScheduler::queue_cond.notify_all();
  302. TaskScheduler::queue_mutex.unlock();
  303. /* delete threads */
  304. foreach (thread *t, threads) {
  305. t->join();
  306. delete t;
  307. }
  308. threads.clear();
  309. }
  310. }
  311. void TaskScheduler::free_memory()
  312. {
  313. assert(users == 0);
  314. threads.free_memory();
  315. }
  316. bool TaskScheduler::thread_wait_pop(Entry &entry)
  317. {
  318. thread_scoped_lock queue_lock(queue_mutex);
  319. while (queue.empty() && !do_exit)
  320. queue_cond.wait(queue_lock);
  321. if (queue.empty()) {
  322. assert(do_exit);
  323. return false;
  324. }
  325. entry = queue.front();
  326. queue.pop_front();
  327. return true;
  328. }
  329. void TaskScheduler::thread_run(int thread_id)
  330. {
  331. Entry entry;
  332. /* todo: test affinity/denormal mask */
  333. /* keep popping off tasks */
  334. while (thread_wait_pop(entry)) {
  335. /* run task */
  336. entry.task->run(thread_id);
  337. /* delete task */
  338. delete entry.task;
  339. /* notify pool task was done */
  340. entry.pool->num_decrease(1);
  341. }
  342. }
  343. void TaskScheduler::push(Entry &entry, bool front)
  344. {
  345. entry.pool->num_increase();
  346. /* add entry to queue */
  347. TaskScheduler::queue_mutex.lock();
  348. if (front)
  349. TaskScheduler::queue.push_front(entry);
  350. else
  351. TaskScheduler::queue.push_back(entry);
  352. TaskScheduler::queue_cond.notify_one();
  353. TaskScheduler::queue_mutex.unlock();
  354. }
  355. void TaskScheduler::clear(TaskPool *pool)
  356. {
  357. thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
  358. /* erase all tasks from this pool from the queue */
  359. list<Entry>::iterator it = queue.begin();
  360. int done = 0;
  361. while (it != queue.end()) {
  362. Entry &entry = *it;
  363. if (entry.pool == pool) {
  364. done++;
  365. delete entry.task;
  366. it = queue.erase(it);
  367. }
  368. else
  369. it++;
  370. }
  371. queue_lock.unlock();
  372. /* notify done */
  373. pool->num_decrease(done);
  374. }
  375. /* Dedicated Task Pool */
  376. DedicatedTaskPool::DedicatedTaskPool()
  377. {
  378. do_cancel = false;
  379. do_exit = false;
  380. num = 0;
  381. worker_thread = new thread(function_bind(&DedicatedTaskPool::thread_run, this));
  382. }
  383. DedicatedTaskPool::~DedicatedTaskPool()
  384. {
  385. stop();
  386. worker_thread->join();
  387. delete worker_thread;
  388. }
  389. void DedicatedTaskPool::push(Task *task, bool front)
  390. {
  391. num_increase();
  392. /* add task to queue */
  393. queue_mutex.lock();
  394. if (front)
  395. queue.push_front(task);
  396. else
  397. queue.push_back(task);
  398. queue_cond.notify_one();
  399. queue_mutex.unlock();
  400. }
  401. void DedicatedTaskPool::push(const TaskRunFunction &run, bool front)
  402. {
  403. push(new Task(run), front);
  404. }
  405. void DedicatedTaskPool::wait()
  406. {
  407. thread_scoped_lock num_lock(num_mutex);
  408. while (num)
  409. num_cond.wait(num_lock);
  410. }
  411. void DedicatedTaskPool::cancel()
  412. {
  413. do_cancel = true;
  414. clear();
  415. wait();
  416. do_cancel = false;
  417. }
  418. void DedicatedTaskPool::stop()
  419. {
  420. clear();
  421. do_exit = true;
  422. queue_cond.notify_all();
  423. wait();
  424. assert(num == 0);
  425. }
  426. bool DedicatedTaskPool::canceled()
  427. {
  428. return do_cancel;
  429. }
  430. void DedicatedTaskPool::num_decrease(int done)
  431. {
  432. thread_scoped_lock num_lock(num_mutex);
  433. num -= done;
  434. assert(num >= 0);
  435. if (num == 0)
  436. num_cond.notify_all();
  437. }
  438. void DedicatedTaskPool::num_increase()
  439. {
  440. thread_scoped_lock num_lock(num_mutex);
  441. num++;
  442. num_cond.notify_all();
  443. }
  444. bool DedicatedTaskPool::thread_wait_pop(Task *&task)
  445. {
  446. thread_scoped_lock queue_lock(queue_mutex);
  447. while (queue.empty() && !do_exit)
  448. queue_cond.wait(queue_lock);
  449. if (queue.empty()) {
  450. assert(do_exit);
  451. return false;
  452. }
  453. task = queue.front();
  454. queue.pop_front();
  455. return true;
  456. }
  457. void DedicatedTaskPool::thread_run()
  458. {
  459. Task *task;
  460. /* keep popping off tasks */
  461. while (thread_wait_pop(task)) {
  462. /* run task */
  463. task->run(0);
  464. /* delete task */
  465. delete task;
  466. /* notify task was done */
  467. num_decrease(1);
  468. }
  469. }
  470. void DedicatedTaskPool::clear()
  471. {
  472. thread_scoped_lock queue_lock(queue_mutex);
  473. /* erase all tasks from the queue */
  474. list<Task *>::iterator it = queue.begin();
  475. int done = 0;
  476. while (it != queue.end()) {
  477. done++;
  478. delete *it;
  479. it = queue.erase(it);
  480. }
  481. queue_lock.unlock();
  482. /* notify done */
  483. num_decrease(done);
  484. }
  485. string TaskPool::Summary::full_report() const
  486. {
  487. string report = "";
  488. report += string_printf("Total time: %f\n", time_total);
  489. report += string_printf("Tasks handled: %d\n", num_tasks_handled);
  490. return report;
  491. }
  492. CCL_NAMESPACE_END