btTaskScheduler.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793
  1. #include "LinearMath/btMinMax.h"
  2. #include "LinearMath/btAlignedObjectArray.h"
  3. #include "LinearMath/btThreads.h"
  4. #include "LinearMath/btQuickprof.h"
  5. #include <stdio.h>
  6. #include <algorithm>
  7. #if BT_THREADSAFE
  8. #include "btThreadSupportInterface.h"
  9. #if defined(_WIN32)
  10. #define WIN32_LEAN_AND_MEAN
  11. #include <windows.h>
  12. #endif
  13. typedef unsigned long long btU64;
  14. static const int kCacheLineSize = 64;
  15. void btSpinPause()
  16. {
  17. #if defined(_WIN32)
  18. YieldProcessor();
  19. #endif
  20. }
  21. struct WorkerThreadStatus
  22. {
  23. enum Type
  24. {
  25. kInvalid,
  26. kWaitingForWork,
  27. kWorking,
  28. kSleeping,
  29. };
  30. };
  31. ATTRIBUTE_ALIGNED64(class)
  32. WorkerThreadDirectives
  33. {
  34. static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
  35. // directives for all worker threads packed into a single cacheline
  36. char m_threadDirs[kMaxThreadCount];
  37. public:
  38. enum Type
  39. {
  40. kInvalid,
  41. kGoToSleep, // go to sleep
  42. kStayAwakeButIdle, // wait for not checking job queue
  43. kScanForJobs, // actively scan job queue for jobs
  44. };
  45. WorkerThreadDirectives()
  46. {
  47. for (int i = 0; i < kMaxThreadCount; ++i)
  48. {
  49. m_threadDirs[i] = 0;
  50. }
  51. }
  52. Type getDirective(int threadId)
  53. {
  54. btAssert(threadId < kMaxThreadCount);
  55. return static_cast<Type>(m_threadDirs[threadId]);
  56. }
  57. void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
  58. {
  59. btAssert(threadBegin < threadEnd);
  60. btAssert(threadEnd <= kMaxThreadCount);
  61. char dirChar = static_cast<char>(dir);
  62. for (int i = threadBegin; i < threadEnd; ++i)
  63. {
  64. m_threadDirs[i] = dirChar;
  65. }
  66. }
  67. };
  68. class JobQueue;
  69. ATTRIBUTE_ALIGNED64(struct)
  70. ThreadLocalStorage
  71. {
  72. int m_threadId;
  73. WorkerThreadStatus::Type m_status;
  74. int m_numJobsFinished;
  75. btSpinMutex m_mutex;
  76. btScalar m_sumResult;
  77. WorkerThreadDirectives* m_directive;
  78. JobQueue* m_queue;
  79. btClock* m_clock;
  80. unsigned int m_cooldownTime;
  81. };
  82. struct IJob
  83. {
  84. virtual void executeJob(int threadId) = 0;
  85. };
  86. class ParallelForJob : public IJob
  87. {
  88. const btIParallelForBody* m_body;
  89. int m_begin;
  90. int m_end;
  91. public:
  92. ParallelForJob(int iBegin, int iEnd, const btIParallelForBody& body)
  93. {
  94. m_body = &body;
  95. m_begin = iBegin;
  96. m_end = iEnd;
  97. }
  98. virtual void executeJob(int threadId) BT_OVERRIDE
  99. {
  100. BT_PROFILE("executeJob");
  101. // call the functor body to do the work
  102. m_body->forLoop(m_begin, m_end);
  103. }
  104. };
  105. class ParallelSumJob : public IJob
  106. {
  107. const btIParallelSumBody* m_body;
  108. ThreadLocalStorage* m_threadLocalStoreArray;
  109. int m_begin;
  110. int m_end;
  111. public:
  112. ParallelSumJob(int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls)
  113. {
  114. m_body = &body;
  115. m_threadLocalStoreArray = tls;
  116. m_begin = iBegin;
  117. m_end = iEnd;
  118. }
  119. virtual void executeJob(int threadId) BT_OVERRIDE
  120. {
  121. BT_PROFILE("executeJob");
  122. // call the functor body to do the work
  123. btScalar val = m_body->sumLoop(m_begin, m_end);
  124. #if BT_PARALLEL_SUM_DETERMINISTISM
  125. // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
  126. const float TRUNC_SCALE = float(1 << 19);
  127. val = floor(val * TRUNC_SCALE + 0.5f) / TRUNC_SCALE; // truncate some bits
  128. #endif
  129. m_threadLocalStoreArray[threadId].m_sumResult += val;
  130. }
  131. };
  132. ATTRIBUTE_ALIGNED64(class)
  133. JobQueue
  134. {
  135. btThreadSupportInterface* m_threadSupport;
  136. btCriticalSection* m_queueLock;
  137. btSpinMutex m_mutex;
  138. btAlignedObjectArray<IJob*> m_jobQueue;
  139. char* m_jobMem;
  140. int m_jobMemSize;
  141. bool m_queueIsEmpty;
  142. int m_tailIndex;
  143. int m_headIndex;
  144. int m_allocSize;
  145. bool m_useSpinMutex;
  146. btAlignedObjectArray<JobQueue*> m_neighborContexts;
  147. char m_cachePadding[kCacheLineSize]; // prevent false sharing
  148. void freeJobMem()
  149. {
  150. if (m_jobMem)
  151. {
  152. // free old
  153. btAlignedFree(m_jobMem);
  154. m_jobMem = NULL;
  155. }
  156. }
  157. void resizeJobMem(int newSize)
  158. {
  159. if (newSize > m_jobMemSize)
  160. {
  161. freeJobMem();
  162. m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
  163. m_jobMemSize = newSize;
  164. }
  165. }
  166. public:
  167. JobQueue()
  168. {
  169. m_jobMem = NULL;
  170. m_jobMemSize = 0;
  171. m_threadSupport = NULL;
  172. m_queueLock = NULL;
  173. m_headIndex = 0;
  174. m_tailIndex = 0;
  175. m_useSpinMutex = false;
  176. }
  177. ~JobQueue()
  178. {
  179. exit();
  180. }
  181. void exit()
  182. {
  183. freeJobMem();
  184. if (m_queueLock && m_threadSupport)
  185. {
  186. m_threadSupport->deleteCriticalSection(m_queueLock);
  187. m_queueLock = NULL;
  188. m_threadSupport = 0;
  189. }
  190. }
  191. void init(btThreadSupportInterface * threadSup, btAlignedObjectArray<JobQueue> * contextArray)
  192. {
  193. m_threadSupport = threadSup;
  194. if (threadSup)
  195. {
  196. m_queueLock = m_threadSupport->createCriticalSection();
  197. }
  198. setupJobStealing(contextArray, contextArray->size());
  199. }
  200. void setupJobStealing(btAlignedObjectArray<JobQueue> * contextArray, int numActiveContexts)
  201. {
  202. btAlignedObjectArray<JobQueue>& contexts = *contextArray;
  203. int selfIndex = 0;
  204. for (int i = 0; i < contexts.size(); ++i)
  205. {
  206. if (this == &contexts[i])
  207. {
  208. selfIndex = i;
  209. break;
  210. }
  211. }
  212. int numNeighbors = btMin(2, contexts.size() - 1);
  213. int neighborOffsets[] = {-1, 1, -2, 2, -3, 3};
  214. int numOffsets = sizeof(neighborOffsets) / sizeof(neighborOffsets[0]);
  215. m_neighborContexts.reserve(numNeighbors);
  216. m_neighborContexts.resizeNoInitialize(0);
  217. for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
  218. {
  219. int neighborIndex = selfIndex + neighborOffsets[i];
  220. if (neighborIndex >= 0 && neighborIndex < numActiveContexts)
  221. {
  222. m_neighborContexts.push_back(&contexts[neighborIndex]);
  223. }
  224. }
  225. }
  226. bool isQueueEmpty() const { return m_queueIsEmpty; }
  227. void lockQueue()
  228. {
  229. if (m_useSpinMutex)
  230. {
  231. m_mutex.lock();
  232. }
  233. else
  234. {
  235. m_queueLock->lock();
  236. }
  237. }
  238. void unlockQueue()
  239. {
  240. if (m_useSpinMutex)
  241. {
  242. m_mutex.unlock();
  243. }
  244. else
  245. {
  246. m_queueLock->unlock();
  247. }
  248. }
  249. void clearQueue(int jobCount, int jobSize)
  250. {
  251. lockQueue();
  252. m_headIndex = 0;
  253. m_tailIndex = 0;
  254. m_allocSize = 0;
  255. m_queueIsEmpty = true;
  256. int jobBufSize = jobSize * jobCount;
  257. // make sure we have enough memory allocated to store jobs
  258. if (jobBufSize > m_jobMemSize)
  259. {
  260. resizeJobMem(jobBufSize);
  261. }
  262. // make sure job queue is big enough
  263. if (jobCount > m_jobQueue.capacity())
  264. {
  265. m_jobQueue.reserve(jobCount);
  266. }
  267. unlockQueue();
  268. m_jobQueue.resizeNoInitialize(0);
  269. }
  270. void* allocJobMem(int jobSize)
  271. {
  272. btAssert(m_jobMemSize >= (m_allocSize + jobSize));
  273. void* jobMem = &m_jobMem[m_allocSize];
  274. m_allocSize += jobSize;
  275. return jobMem;
  276. }
  277. void submitJob(IJob * job)
  278. {
  279. btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
  280. m_jobQueue.push_back(job);
  281. lockQueue();
  282. m_tailIndex++;
  283. m_queueIsEmpty = false;
  284. unlockQueue();
  285. }
  286. IJob* consumeJobFromOwnQueue()
  287. {
  288. if (m_queueIsEmpty)
  289. {
  290. // lock free path. even if this is taken erroneously it isn't harmful
  291. return NULL;
  292. }
  293. IJob* job = NULL;
  294. lockQueue();
  295. if (!m_queueIsEmpty)
  296. {
  297. job = m_jobQueue[m_headIndex++];
  298. btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
  299. if (m_headIndex == m_tailIndex)
  300. {
  301. m_queueIsEmpty = true;
  302. }
  303. }
  304. unlockQueue();
  305. return job;
  306. }
  307. IJob* consumeJob()
  308. {
  309. if (IJob* job = consumeJobFromOwnQueue())
  310. {
  311. return job;
  312. }
  313. // own queue is empty, try to steal from neighbor
  314. for (int i = 0; i < m_neighborContexts.size(); ++i)
  315. {
  316. JobQueue* otherContext = m_neighborContexts[i];
  317. if (IJob* job = otherContext->consumeJobFromOwnQueue())
  318. {
  319. return job;
  320. }
  321. }
  322. return NULL;
  323. }
  324. };
  325. static void WorkerThreadFunc(void* userPtr)
  326. {
  327. BT_PROFILE("WorkerThreadFunc");
  328. ThreadLocalStorage* localStorage = (ThreadLocalStorage*)userPtr;
  329. JobQueue* jobQueue = localStorage->m_queue;
  330. bool shouldSleep = false;
  331. int threadId = localStorage->m_threadId;
  332. while (!shouldSleep)
  333. {
  334. // do work
  335. localStorage->m_mutex.lock();
  336. while (IJob* job = jobQueue->consumeJob())
  337. {
  338. localStorage->m_status = WorkerThreadStatus::kWorking;
  339. job->executeJob(threadId);
  340. localStorage->m_numJobsFinished++;
  341. }
  342. localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
  343. localStorage->m_mutex.unlock();
  344. btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
  345. // while queue is empty,
  346. while (jobQueue->isQueueEmpty())
  347. {
  348. // todo: spin wait a bit to avoid hammering the empty queue
  349. btSpinPause();
  350. if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep)
  351. {
  352. shouldSleep = true;
  353. break;
  354. }
  355. // if jobs are incoming,
  356. if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs)
  357. {
  358. clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
  359. }
  360. else
  361. {
  362. for (int i = 0; i < 50; ++i)
  363. {
  364. btSpinPause();
  365. btSpinPause();
  366. btSpinPause();
  367. btSpinPause();
  368. if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
  369. {
  370. break;
  371. }
  372. }
  373. // if no jobs incoming and queue has been empty for the cooldown time, sleep
  374. btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
  375. if (timeElapsed > localStorage->m_cooldownTime)
  376. {
  377. shouldSleep = true;
  378. break;
  379. }
  380. }
  381. }
  382. }
  383. {
  384. BT_PROFILE("sleep");
  385. // go sleep
  386. localStorage->m_mutex.lock();
  387. localStorage->m_status = WorkerThreadStatus::kSleeping;
  388. localStorage->m_mutex.unlock();
  389. }
  390. }
  391. class btTaskSchedulerDefault : public btITaskScheduler
  392. {
  393. btThreadSupportInterface* m_threadSupport;
  394. WorkerThreadDirectives* m_workerDirective;
  395. btAlignedObjectArray<JobQueue> m_jobQueues;
  396. btAlignedObjectArray<JobQueue*> m_perThreadJobQueues;
  397. btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage;
  398. btSpinMutex m_antiNestingLock; // prevent nested parallel-for
  399. btClock m_clock;
  400. int m_numThreads;
  401. int m_numWorkerThreads;
  402. int m_numActiveJobQueues;
  403. int m_maxNumThreads;
  404. int m_numJobs;
  405. static const int kFirstWorkerThreadId = 1;
  406. public:
  407. btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
  408. {
  409. m_threadSupport = NULL;
  410. m_workerDirective = NULL;
  411. }
  412. virtual ~btTaskSchedulerDefault()
  413. {
  414. waitForWorkersToSleep();
  415. for (int i = 0; i < m_jobQueues.size(); ++i)
  416. {
  417. m_jobQueues[i].exit();
  418. }
  419. if (m_threadSupport)
  420. {
  421. delete m_threadSupport;
  422. m_threadSupport = NULL;
  423. }
  424. if (m_workerDirective)
  425. {
  426. btAlignedFree(m_workerDirective);
  427. m_workerDirective = NULL;
  428. }
  429. }
  430. void init()
  431. {
  432. btThreadSupportInterface::ConstructionInfo constructionInfo("TaskScheduler", WorkerThreadFunc);
  433. m_threadSupport = btThreadSupportInterface::create(constructionInfo);
  434. m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64));
  435. m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
  436. m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
  437. m_numThreads = m_maxNumThreads;
  438. // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
  439. int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
  440. int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads - 1) : (m_maxNumThreads / numThreadsPerQueue);
  441. m_jobQueues.resize(numJobQueues);
  442. m_numActiveJobQueues = numJobQueues;
  443. for (int i = 0; i < m_jobQueues.size(); ++i)
  444. {
  445. m_jobQueues[i].init(m_threadSupport, &m_jobQueues);
  446. }
  447. m_perThreadJobQueues.resize(m_numThreads);
  448. for (int i = 0; i < m_numThreads; i++)
  449. {
  450. JobQueue* jq = NULL;
  451. // only worker threads get a job queue
  452. if (i > 0)
  453. {
  454. if (numThreadsPerQueue == 1)
  455. {
  456. // one queue per worker thread
  457. jq = &m_jobQueues[i - kFirstWorkerThreadId];
  458. }
  459. else
  460. {
  461. // 2 threads share each queue
  462. jq = &m_jobQueues[i / numThreadsPerQueue];
  463. }
  464. }
  465. m_perThreadJobQueues[i] = jq;
  466. }
  467. m_threadLocalStorage.resize(m_numThreads);
  468. for (int i = 0; i < m_numThreads; i++)
  469. {
  470. ThreadLocalStorage& storage = m_threadLocalStorage[i];
  471. storage.m_threadId = i;
  472. storage.m_directive = m_workerDirective;
  473. storage.m_status = WorkerThreadStatus::kSleeping;
  474. storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do
  475. storage.m_clock = &m_clock;
  476. storage.m_queue = m_perThreadJobQueues[i];
  477. }
  478. setWorkerDirectives(WorkerThreadDirectives::kGoToSleep); // no work for them yet
  479. setNumThreads(m_threadSupport->getCacheFriendlyNumThreads());
  480. }
  481. void setWorkerDirectives(WorkerThreadDirectives::Type dir)
  482. {
  483. m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
  484. }
  485. virtual int getMaxNumThreads() const BT_OVERRIDE
  486. {
  487. return m_maxNumThreads;
  488. }
  489. virtual int getNumThreads() const BT_OVERRIDE
  490. {
  491. return m_numThreads;
  492. }
  493. virtual void setNumThreads(int numThreads) BT_OVERRIDE
  494. {
  495. m_numThreads = btMax(btMin(numThreads, int(m_maxNumThreads)), 1);
  496. m_numWorkerThreads = m_numThreads - 1;
  497. m_numActiveJobQueues = 0;
  498. // if there is at least 1 worker,
  499. if (m_numWorkerThreads > 0)
  500. {
  501. // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
  502. JobQueue* lastActiveContext = m_perThreadJobQueues[m_numThreads - 1];
  503. int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
  504. m_numActiveJobQueues = iLastActiveContext + 1;
  505. for (int i = 0; i < m_jobQueues.size(); ++i)
  506. {
  507. m_jobQueues[i].setupJobStealing(&m_jobQueues, m_numActiveJobQueues);
  508. }
  509. }
  510. m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
  511. }
  512. void waitJobs()
  513. {
  514. BT_PROFILE("waitJobs");
  515. // have the main thread work until the job queues are empty
  516. int numMainThreadJobsFinished = 0;
  517. for (int i = 0; i < m_numActiveJobQueues; ++i)
  518. {
  519. while (IJob* job = m_jobQueues[i].consumeJob())
  520. {
  521. job->executeJob(0);
  522. numMainThreadJobsFinished++;
  523. }
  524. }
  525. // done with jobs for now, tell workers to rest (but not sleep)
  526. setWorkerDirectives(WorkerThreadDirectives::kStayAwakeButIdle);
  527. btU64 clockStart = m_clock.getTimeMicroseconds();
  528. // wait for workers to finish any jobs in progress
  529. while (true)
  530. {
  531. int numWorkerJobsFinished = 0;
  532. for (int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread)
  533. {
  534. ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
  535. storage->m_mutex.lock();
  536. numWorkerJobsFinished += storage->m_numJobsFinished;
  537. storage->m_mutex.unlock();
  538. }
  539. if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
  540. {
  541. break;
  542. }
  543. btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
  544. btAssert(timeElapsed < 1000);
  545. if (timeElapsed > 100000)
  546. {
  547. break;
  548. }
  549. btSpinPause();
  550. }
  551. }
  552. void wakeWorkers(int numWorkersToWake)
  553. {
  554. BT_PROFILE("wakeWorkers");
  555. btAssert(m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs);
  556. int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
  557. int numActiveWorkers = 0;
  558. for (int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker)
  559. {
  560. // note this count of active workers is not necessarily totally reliable, because a worker thread could be
  561. // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
  562. ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
  563. if (storage.m_status != WorkerThreadStatus::kSleeping)
  564. {
  565. numActiveWorkers++;
  566. }
  567. }
  568. for (int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker)
  569. {
  570. ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
  571. if (storage.m_status == WorkerThreadStatus::kSleeping)
  572. {
  573. m_threadSupport->runTask(iWorker, &storage);
  574. numActiveWorkers++;
  575. }
  576. }
  577. }
  578. void waitForWorkersToSleep()
  579. {
  580. BT_PROFILE("waitForWorkersToSleep");
  581. setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
  582. m_threadSupport->waitForAllTasks();
  583. for (int i = kFirstWorkerThreadId; i < m_numThreads; i++)
  584. {
  585. ThreadLocalStorage& storage = m_threadLocalStorage[i];
  586. btAssert(storage.m_status == WorkerThreadStatus::kSleeping);
  587. }
  588. }
  589. virtual void sleepWorkerThreadsHint() BT_OVERRIDE
  590. {
  591. BT_PROFILE("sleepWorkerThreadsHint");
  592. // hint the task scheduler that we may not be using these threads for a little while
  593. setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
  594. }
  595. void prepareWorkerThreads()
  596. {
  597. for (int i = kFirstWorkerThreadId; i < m_numThreads; ++i)
  598. {
  599. ThreadLocalStorage& storage = m_threadLocalStorage[i];
  600. storage.m_mutex.lock();
  601. storage.m_numJobsFinished = 0;
  602. storage.m_mutex.unlock();
  603. }
  604. setWorkerDirectives(WorkerThreadDirectives::kScanForJobs);
  605. }
  606. virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody& body) BT_OVERRIDE
  607. {
  608. BT_PROFILE("parallelFor_ThreadSupport");
  609. btAssert(iEnd >= iBegin);
  610. btAssert(grainSize >= 1);
  611. int iterationCount = iEnd - iBegin;
  612. if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock())
  613. {
  614. typedef ParallelForJob JobType;
  615. int jobCount = (iterationCount + grainSize - 1) / grainSize;
  616. m_numJobs = jobCount;
  617. btAssert(jobCount >= 2); // need more than one job for multithreading
  618. int jobSize = sizeof(JobType);
  619. for (int i = 0; i < m_numActiveJobQueues; ++i)
  620. {
  621. m_jobQueues[i].clearQueue(jobCount, jobSize);
  622. }
  623. // prepare worker threads for incoming work
  624. prepareWorkerThreads();
  625. // submit all of the jobs
  626. int iJob = 0;
  627. int iThread = kFirstWorkerThreadId; // first worker thread
  628. for (int i = iBegin; i < iEnd; i += grainSize)
  629. {
  630. btAssert(iJob < jobCount);
  631. int iE = btMin(i + grainSize, iEnd);
  632. JobQueue* jq = m_perThreadJobQueues[iThread];
  633. btAssert(jq);
  634. btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
  635. void* jobMem = jq->allocJobMem(jobSize);
  636. JobType* job = new (jobMem) ParallelForJob(i, iE, body); // placement new
  637. jq->submitJob(job);
  638. iJob++;
  639. iThread++;
  640. if (iThread >= m_numThreads)
  641. {
  642. iThread = kFirstWorkerThreadId; // first worker thread
  643. }
  644. }
  645. wakeWorkers(jobCount - 1);
  646. // put the main thread to work on emptying the job queue and then wait for all workers to finish
  647. waitJobs();
  648. m_antiNestingLock.unlock();
  649. }
  650. else
  651. {
  652. BT_PROFILE("parallelFor_mainThread");
  653. // just run on main thread
  654. body.forLoop(iBegin, iEnd);
  655. }
  656. }
  657. virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body) BT_OVERRIDE
  658. {
  659. BT_PROFILE("parallelSum_ThreadSupport");
  660. btAssert(iEnd >= iBegin);
  661. btAssert(grainSize >= 1);
  662. int iterationCount = iEnd - iBegin;
  663. if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock())
  664. {
  665. typedef ParallelSumJob JobType;
  666. int jobCount = (iterationCount + grainSize - 1) / grainSize;
  667. m_numJobs = jobCount;
  668. btAssert(jobCount >= 2); // need more than one job for multithreading
  669. int jobSize = sizeof(JobType);
  670. for (int i = 0; i < m_numActiveJobQueues; ++i)
  671. {
  672. m_jobQueues[i].clearQueue(jobCount, jobSize);
  673. }
  674. // initialize summation
  675. for (int iThread = 0; iThread < m_numThreads; ++iThread)
  676. {
  677. m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
  678. }
  679. // prepare worker threads for incoming work
  680. prepareWorkerThreads();
  681. // submit all of the jobs
  682. int iJob = 0;
  683. int iThread = kFirstWorkerThreadId; // first worker thread
  684. for (int i = iBegin; i < iEnd; i += grainSize)
  685. {
  686. btAssert(iJob < jobCount);
  687. int iE = btMin(i + grainSize, iEnd);
  688. JobQueue* jq = m_perThreadJobQueues[iThread];
  689. btAssert(jq);
  690. btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
  691. void* jobMem = jq->allocJobMem(jobSize);
  692. JobType* job = new (jobMem) ParallelSumJob(i, iE, body, &m_threadLocalStorage[0]); // placement new
  693. jq->submitJob(job);
  694. iJob++;
  695. iThread++;
  696. if (iThread >= m_numThreads)
  697. {
  698. iThread = kFirstWorkerThreadId; // first worker thread
  699. }
  700. }
  701. wakeWorkers(jobCount - 1);
  702. // put the main thread to work on emptying the job queue and then wait for all workers to finish
  703. waitJobs();
  704. // add up all the thread sums
  705. btScalar sum = btScalar(0);
  706. for (int iThread = 0; iThread < m_numThreads; ++iThread)
  707. {
  708. sum += m_threadLocalStorage[iThread].m_sumResult;
  709. }
  710. m_antiNestingLock.unlock();
  711. return sum;
  712. }
  713. else
  714. {
  715. BT_PROFILE("parallelSum_mainThread");
  716. // just run on main thread
  717. return body.sumLoop(iBegin, iEnd);
  718. }
  719. }
  720. };
  721. btITaskScheduler* btCreateDefaultTaskScheduler()
  722. {
  723. btTaskSchedulerDefault* ts = new btTaskSchedulerDefault();
  724. ts->init();
  725. return ts;
  726. }
  727. #else // #if BT_THREADSAFE
  728. btITaskScheduler* btCreateDefaultTaskScheduler()
  729. {
  730. return NULL;
  731. }
  732. #endif // #else // #if BT_THREADSAFE