ParallelJobList.cpp 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297
  1. /*
  2. ===========================================================================
  3. Doom 3 BFG Edition GPL Source Code
  4. Copyright (C) 1993-2012 id Software LLC, a ZeniMax Media company.
  5. This file is part of the Doom 3 BFG Edition GPL Source Code ("Doom 3 BFG Edition Source Code").
  6. Doom 3 BFG Edition Source Code is free software: you can redistribute it and/or modify
  7. it under the terms of the GNU General Public License as published by
  8. the Free Software Foundation, either version 3 of the License, or
  9. (at your option) any later version.
  10. Doom 3 BFG Edition Source Code is distributed in the hope that it will be useful,
  11. but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. GNU General Public License for more details.
  14. You should have received a copy of the GNU General Public License
  15. along with Doom 3 BFG Edition Source Code. If not, see <http://www.gnu.org/licenses/>.
  16. In addition, the Doom 3 BFG Edition Source Code is also subject to certain additional terms. You should have received a copy of these additional terms immediately following the terms and conditions of the GNU General Public License which accompanied the Doom 3 BFG Edition Source Code. If not, please request a copy in writing from id Software at the address below.
  17. If you have questions concerning this license or the applicable additional terms, you may contact in writing id Software LLC, c/o ZeniMax Media Inc., Suite 120, Rockville, Maryland 20850 USA.
  18. ===========================================================================
  19. */
  20. #pragma hdrstop
  21. #include "precompiled.h"
  22. #include "ParallelJobList.h"
  23. /*
  24. ================================================================================================
  25. Job and Job-List names
  26. ================================================================================================
  27. */
  28. const char * jobNames[] = {
  29. ASSERT_ENUM_STRING( JOBLIST_RENDERER_FRONTEND, 0 ),
  30. ASSERT_ENUM_STRING( JOBLIST_RENDERER_BACKEND, 1 ),
  31. ASSERT_ENUM_STRING( JOBLIST_UTILITY, 9 ),
  32. };
  33. static const int MAX_REGISTERED_JOBS = 128;
  34. struct registeredJob {
  35. jobRun_t function;
  36. const char * name;
  37. } registeredJobs[MAX_REGISTERED_JOBS];
  38. static int numRegisteredJobs;
  39. const char * GetJobListName( jobListId_t id ) {
  40. return jobNames[id];
  41. }
  42. /*
  43. ========================
  44. IsRegisteredJob
  45. ========================
  46. */
  47. static bool IsRegisteredJob( jobRun_t function ) {
  48. for ( int i = 0; i < numRegisteredJobs; i++ ) {
  49. if ( registeredJobs[i].function == function ) {
  50. return true;
  51. }
  52. }
  53. return false;
  54. }
  55. /*
  56. ========================
  57. RegisterJob
  58. ========================
  59. */
  60. void RegisterJob( jobRun_t function, const char * name ) {
  61. if ( IsRegisteredJob( function ) ) {
  62. return;
  63. }
  64. registeredJobs[numRegisteredJobs].function = function;
  65. registeredJobs[numRegisteredJobs].name = name;
  66. numRegisteredJobs++;
  67. }
  68. /*
  69. ========================
  70. GetJobName
  71. ========================
  72. */
  73. const char * GetJobName( jobRun_t function ) {
  74. for ( int i = 0; i < numRegisteredJobs; i++ ) {
  75. if ( registeredJobs[i].function == function ) {
  76. return registeredJobs[i].name;
  77. }
  78. }
  79. return "unknown";
  80. }
  81. /*
  82. ========================
  83. idParallelJobRegistration::idParallelJobRegistration
  84. ========================
  85. */
  86. idParallelJobRegistration::idParallelJobRegistration( jobRun_t function, const char * name ) {
  87. RegisterJob( function, name );
  88. }
  89. int globalSpuLocalStoreActive;
  90. void * globalSpuLocalStoreStart;
  91. void * globalSpuLocalStoreEnd;
  92. idSysMutex globalSpuLocalStoreMutex;
  93. /*
  94. ================================================================================================
  95. PS3
  96. ================================================================================================
  97. */
  98. /*
  99. ================================================================================================
  100. idParallelJobList_Threads
  101. ================================================================================================
  102. */
  103. static idCVar jobs_longJobMicroSec( "jobs_longJobMicroSec", "10000", CVAR_INTEGER, "print a warning for jobs that take more than this number of microseconds" );
  104. const static int MAX_THREADS = 32;
  105. struct threadJobListState_t {
  106. threadJobListState_t() :
  107. jobList( NULL ),
  108. version( 0xFFFFFFFF ),
  109. signalIndex( 0 ),
  110. lastJobIndex( 0 ),
  111. nextJobIndex( -1 ) {}
  112. threadJobListState_t( int _version ) :
  113. jobList( NULL ),
  114. version( _version ),
  115. signalIndex( 0 ),
  116. lastJobIndex( 0 ),
  117. nextJobIndex( -1 ) {}
  118. idParallelJobList_Threads * jobList;
  119. int version;
  120. int signalIndex;
  121. int lastJobIndex;
  122. int nextJobIndex;
  123. };
  124. struct threadStats_t {
  125. unsigned int numExecutedJobs;
  126. unsigned int numExecutedSyncs;
  127. uint64 submitTime;
  128. uint64 startTime;
  129. uint64 endTime;
  130. uint64 waitTime;
  131. uint64 threadExecTime[MAX_THREADS];
  132. uint64 threadTotalTime[MAX_THREADS];
  133. };
  134. class idParallelJobList_Threads {
  135. public:
  136. idParallelJobList_Threads( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs );
  137. ~idParallelJobList_Threads();
  138. //------------------------
  139. // These are called from the one thread that manages this list.
  140. //------------------------
  141. ID_INLINE void AddJob( jobRun_t function, void * data );
  142. ID_INLINE void InsertSyncPoint( jobSyncType_t syncType );
  143. void Submit( idParallelJobList_Threads * waitForJobList_, int parallelism );
  144. void Wait();
  145. bool TryWait();
  146. bool IsSubmitted() const;
  147. unsigned int GetNumExecutedJobs() const { return threadStats.numExecutedJobs; }
  148. unsigned int GetNumSyncs() const { return threadStats.numExecutedSyncs; }
  149. uint64 GetSubmitTimeMicroSec() const { return threadStats.submitTime; }
  150. uint64 GetStartTimeMicroSec() const { return threadStats.startTime; }
  151. uint64 GetFinishTimeMicroSec() const { return threadStats.endTime; }
  152. uint64 GetWaitTimeMicroSec() const { return threadStats.waitTime; }
  153. uint64 GetTotalProcessingTimeMicroSec() const;
  154. uint64 GetTotalWastedTimeMicroSec() const;
  155. uint64 GetUnitProcessingTimeMicroSec( int unit ) const;
  156. uint64 GetUnitWastedTimeMicroSec( int unit ) const;
  157. jobListId_t GetId() const { return listId; }
  158. jobListPriority_t GetPriority() const { return listPriority; }
  159. int GetVersion() { return version.GetValue(); }
  160. bool WaitForOtherJobList();
  161. //------------------------
  162. // This is thread safe and called from the job threads.
  163. //------------------------
  164. enum runResult_t {
  165. RUN_OK = 0,
  166. RUN_PROGRESS = BIT( 0 ),
  167. RUN_DONE = BIT( 1 ),
  168. RUN_STALLED = BIT( 2 )
  169. };
  170. int RunJobs( unsigned int threadNum, threadJobListState_t & state, bool singleJob );
  171. private:
  172. static const int NUM_DONE_GUARDS = 4; // cycle through 4 guards so we can cyclicly chain job lists
  173. bool threaded;
  174. bool done;
  175. bool hasSignal;
  176. jobListId_t listId;
  177. jobListPriority_t listPriority;
  178. unsigned int maxJobs;
  179. unsigned int maxSyncs;
  180. unsigned int numSyncs;
  181. int lastSignalJob;
  182. idSysInterlockedInteger * waitForGuard;
  183. idSysInterlockedInteger doneGuards[NUM_DONE_GUARDS];
  184. int currentDoneGuard;
  185. idSysInterlockedInteger version;
  186. struct job_t {
  187. jobRun_t function;
  188. void * data;
  189. int executed;
  190. };
  191. idList< job_t, TAG_JOBLIST > jobList;
  192. idList< idSysInterlockedInteger, TAG_JOBLIST > signalJobCount;
  193. idSysInterlockedInteger currentJob;
  194. idSysInterlockedInteger fetchLock;
  195. idSysInterlockedInteger numThreadsExecuting;
  196. threadStats_t deferredThreadStats;
  197. threadStats_t threadStats;
  198. int RunJobsInternal( unsigned int threadNum, threadJobListState_t & state, bool singleJob );
  199. static void Nop( void * data ) {}
  200. static int JOB_SIGNAL;
  201. static int JOB_SYNCHRONIZE;
  202. static int JOB_LIST_DONE;
  203. };
  204. int idParallelJobList_Threads::JOB_SIGNAL;
  205. int idParallelJobList_Threads::JOB_SYNCHRONIZE;
  206. int idParallelJobList_Threads::JOB_LIST_DONE;
  207. /*
  208. ========================
  209. idParallelJobList_Threads::idParallelJobList_Threads
  210. ========================
  211. */
  212. idParallelJobList_Threads::idParallelJobList_Threads( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs ) :
  213. threaded( true ),
  214. done( true ),
  215. hasSignal( false ),
  216. listId( id ),
  217. listPriority( priority ),
  218. numSyncs( 0 ),
  219. lastSignalJob( 0 ),
  220. waitForGuard( NULL ),
  221. currentDoneGuard( 0 ),
  222. jobList() {
  223. assert( listPriority != JOBLIST_PRIORITY_NONE );
  224. this->maxJobs = maxJobs;
  225. this->maxSyncs = maxSyncs;
  226. jobList.AssureSize( maxJobs + maxSyncs * 2 + 1 ); // syncs go in as dummy jobs and one more to update the doneCount
  227. jobList.SetNum( 0 );
  228. signalJobCount.AssureSize( maxSyncs + 1 ); // need one extra for submit
  229. signalJobCount.SetNum( 0 );
  230. memset( &deferredThreadStats, 0, sizeof( threadStats_t ) );
  231. memset( &threadStats, 0, sizeof( threadStats_t ) );
  232. }
  233. /*
  234. ========================
  235. idParallelJobList_Threads::~idParallelJobList_Threads
  236. ========================
  237. */
  238. idParallelJobList_Threads::~idParallelJobList_Threads() {
  239. Wait();
  240. }
  241. /*
  242. ========================
  243. idParallelJobList_Threads::AddJob
  244. ========================
  245. */
  246. ID_INLINE void idParallelJobList_Threads::AddJob( jobRun_t function, void * data ) {
  247. assert( done );
  248. #if defined( _DEBUG )
  249. // make sure there isn't already a job with the same function and data in the list
  250. if ( jobList.Num() < 1000 ) { // don't do this N^2 slow check on big lists
  251. for ( int i = 0; i < jobList.Num(); i++ ) {
  252. assert( jobList[i].function != function || jobList[i].data != data );
  253. }
  254. }
  255. #endif
  256. if ( 1 ) { // JDC: this never worked in tech5! !jobList.IsFull() ) {
  257. job_t & job = jobList.Alloc();
  258. job.function = function;
  259. job.data = data;
  260. job.executed = 0;
  261. } else {
  262. // debug output to show us what is overflowing
  263. int currentJobCount[MAX_REGISTERED_JOBS] = {};
  264. for ( int i = 0; i < jobList.Num(); ++i ) {
  265. const char * jobName = GetJobName( jobList[ i ].function );
  266. for ( int j = 0; j < numRegisteredJobs; ++j ) {
  267. if ( jobName == registeredJobs[ j ].name ) {
  268. currentJobCount[ j ]++;
  269. break;
  270. }
  271. }
  272. }
  273. // print the quantity of each job type
  274. for ( int i = 0; i < numRegisteredJobs; ++i ) {
  275. if ( currentJobCount[ i ] > 0 ) {
  276. idLib::Printf( "Job: %s, # %d", registeredJobs[ i ].name, currentJobCount[ i ] );
  277. }
  278. }
  279. idLib::Error( "Can't add job '%s', too many jobs %d", GetJobName( function ), jobList.Num() );
  280. }
  281. }
  282. /*
  283. ========================
  284. idParallelJobList_Threads::InsertSyncPoint
  285. ========================
  286. */
  287. ID_INLINE void idParallelJobList_Threads::InsertSyncPoint( jobSyncType_t syncType ) {
  288. assert( done );
  289. switch( syncType ) {
  290. case SYNC_SIGNAL: {
  291. assert( !hasSignal );
  292. if ( jobList.Num() ) {
  293. assert( !hasSignal );
  294. signalJobCount.Alloc();
  295. signalJobCount[signalJobCount.Num() - 1].SetValue( jobList.Num() - lastSignalJob );
  296. lastSignalJob = jobList.Num();
  297. job_t & job = jobList.Alloc();
  298. job.function = Nop;
  299. job.data = & JOB_SIGNAL;
  300. hasSignal = true;
  301. }
  302. break;
  303. }
  304. case SYNC_SYNCHRONIZE: {
  305. if ( hasSignal ) {
  306. job_t & job = jobList.Alloc();
  307. job.function = Nop;
  308. job.data = & JOB_SYNCHRONIZE;
  309. hasSignal = false;
  310. numSyncs++;
  311. }
  312. break;
  313. }
  314. }
  315. }
  316. /*
  317. ========================
  318. idParallelJobList_Threads::Submit
  319. ========================
  320. */
  321. void idParallelJobList_Threads::Submit( idParallelJobList_Threads * waitForJobList, int parallelism ) {
  322. assert( done );
  323. assert( numSyncs <= maxSyncs );
  324. assert( (unsigned int) jobList.Num() <= maxJobs + numSyncs * 2 );
  325. assert( fetchLock.GetValue() == 0 );
  326. done = false;
  327. currentJob.SetValue( 0 );
  328. memset( &deferredThreadStats, 0, sizeof( deferredThreadStats ) );
  329. deferredThreadStats.numExecutedJobs = jobList.Num() - numSyncs * 2;
  330. deferredThreadStats.numExecutedSyncs = numSyncs;
  331. deferredThreadStats.submitTime = Sys_Microseconds();
  332. deferredThreadStats.startTime = 0;
  333. deferredThreadStats.endTime = 0;
  334. deferredThreadStats.waitTime = 0;
  335. if ( jobList.Num() == 0 ) {
  336. return;
  337. }
  338. if ( waitForJobList != NULL ) {
  339. waitForGuard = & waitForJobList->doneGuards[waitForJobList->currentDoneGuard];
  340. } else {
  341. waitForGuard = NULL;
  342. }
  343. currentDoneGuard = ( currentDoneGuard + 1 ) & ( NUM_DONE_GUARDS - 1 );
  344. doneGuards[currentDoneGuard].SetValue( 1 );
  345. signalJobCount.Alloc();
  346. signalJobCount[signalJobCount.Num() - 1].SetValue( jobList.Num() - lastSignalJob );
  347. job_t & job = jobList.Alloc();
  348. job.function = Nop;
  349. job.data = & JOB_LIST_DONE;
  350. if ( threaded ) {
  351. // hand over to the manager
  352. void SubmitJobList( idParallelJobList_Threads * jobList, int parallelism );
  353. SubmitJobList( this, parallelism );
  354. } else {
  355. // run all the jobs right here
  356. threadJobListState_t state( GetVersion() );
  357. RunJobs( 0, state, false );
  358. }
  359. }
  360. /*
  361. ========================
  362. idParallelJobList_Threads::Wait
  363. ========================
  364. */
  365. void idParallelJobList_Threads::Wait() {
  366. if ( jobList.Num() > 0 ) {
  367. // don't lock up but return if the job list was never properly submitted
  368. if ( !verify( !done && signalJobCount.Num() > 0 ) ) {
  369. return;
  370. }
  371. bool waited = false;
  372. uint64 waitStart = Sys_Microseconds();
  373. while ( signalJobCount[signalJobCount.Num() - 1].GetValue() > 0 ) {
  374. Sys_Yield();
  375. waited = true;
  376. }
  377. version.Increment();
  378. while ( numThreadsExecuting.GetValue() > 0 ) {
  379. Sys_Yield();
  380. waited = true;
  381. }
  382. jobList.SetNum( 0 );
  383. signalJobCount.SetNum( 0 );
  384. numSyncs = 0;
  385. lastSignalJob = 0;
  386. uint64 waitEnd = Sys_Microseconds();
  387. deferredThreadStats.waitTime = waited ? ( waitEnd - waitStart ) : 0;
  388. }
  389. memcpy( & threadStats, & deferredThreadStats, sizeof( threadStats ) );
  390. done = true;
  391. }
  392. /*
  393. ========================
  394. idParallelJobList_Threads::TryWait
  395. ========================
  396. */
  397. bool idParallelJobList_Threads::TryWait() {
  398. if ( jobList.Num() == 0 || signalJobCount[signalJobCount.Num() - 1].GetValue() <= 0 ) {
  399. Wait();
  400. return true;
  401. }
  402. return false;
  403. }
  404. /*
  405. ========================
  406. idParallelJobList_Threads::IsSubmitted
  407. ========================
  408. */
  409. bool idParallelJobList_Threads::IsSubmitted() const {
  410. return !done;
  411. }
  412. /*
  413. ========================
  414. idParallelJobList_Threads::GetTotalProcessingTimeMicroSec
  415. ========================
  416. */
  417. uint64 idParallelJobList_Threads::GetTotalProcessingTimeMicroSec() const {
  418. uint64 total = 0;
  419. for ( int unit = 0; unit < MAX_THREADS; unit++ ) {
  420. total += threadStats.threadExecTime[unit];
  421. }
  422. return total;
  423. }
  424. /*
  425. ========================
  426. idParallelJobList_Threads::GetTotalWastedTimeMicroSec
  427. ========================
  428. */
  429. uint64 idParallelJobList_Threads::GetTotalWastedTimeMicroSec() const {
  430. uint64 total = 0;
  431. for ( int unit = 0; unit < MAX_THREADS; unit++ ) {
  432. total += threadStats.threadTotalTime[unit] - threadStats.threadExecTime[unit];
  433. }
  434. return total;
  435. }
  436. /*
  437. ========================
  438. idParallelJobList_Threads::GetUnitProcessingTimeMicroSec
  439. ========================
  440. */
  441. uint64 idParallelJobList_Threads::GetUnitProcessingTimeMicroSec( int unit ) const {
  442. if ( unit < 0 || unit >= MAX_THREADS ) {
  443. return 0;
  444. }
  445. return threadStats.threadExecTime[unit];
  446. }
  447. /*
  448. ========================
  449. idParallelJobList_Threads::GetUnitWastedTimeMicroSec
  450. ========================
  451. */
  452. uint64 idParallelJobList_Threads::GetUnitWastedTimeMicroSec( int unit ) const {
  453. if ( unit < 0 || unit >= MAX_THREADS ) {
  454. return 0;
  455. }
  456. return threadStats.threadTotalTime[unit] - threadStats.threadExecTime[unit];
  457. }
  458. #ifndef _DEBUG
  459. volatile float longJobTime;
  460. volatile jobRun_t longJobFunc;
  461. volatile void * longJobData;
  462. #endif
  463. /*
  464. ========================
  465. idParallelJobList_Threads::RunJobsInternal
  466. ========================
  467. */
  468. int idParallelJobList_Threads::RunJobsInternal( unsigned int threadNum, threadJobListState_t & state, bool singleJob ) {
  469. if ( state.version != version.GetValue() ) {
  470. // trying to run an old version of this list that is already done
  471. return RUN_DONE;
  472. }
  473. assert( threadNum < MAX_THREADS );
  474. if ( deferredThreadStats.startTime == 0 ) {
  475. deferredThreadStats.startTime = Sys_Microseconds(); // first time any thread is running jobs from this list
  476. }
  477. int result = RUN_OK;
  478. do {
  479. // run through all signals and syncs before the last job that has been or is being executed
  480. // this loop is really an optimization to minimize the time spent in the fetchLock section below
  481. for ( ; state.lastJobIndex < (int) currentJob.GetValue() && state.lastJobIndex < jobList.Num(); state.lastJobIndex++ ) {
  482. if ( jobList[state.lastJobIndex].data == & JOB_SIGNAL ) {
  483. state.signalIndex++;
  484. assert( state.signalIndex < signalJobCount.Num() );
  485. } else if ( jobList[state.lastJobIndex].data == & JOB_SYNCHRONIZE ) {
  486. assert( state.signalIndex > 0 );
  487. if ( signalJobCount[state.signalIndex - 1].GetValue() > 0 ) {
  488. // stalled on a synchronization point
  489. return ( result | RUN_STALLED );
  490. }
  491. } else if ( jobList[state.lastJobIndex].data == & JOB_LIST_DONE ) {
  492. if ( signalJobCount[signalJobCount.Num() - 1].GetValue() > 0 ) {
  493. // stalled on a synchronization point
  494. return ( result | RUN_STALLED );
  495. }
  496. }
  497. }
  498. // try to lock to fetch a new job
  499. if ( fetchLock.Increment() == 1 ) {
  500. // grab a new job
  501. state.nextJobIndex = currentJob.Increment() - 1;
  502. // run through any remaining signals and syncs (this should rarely iterate more than once)
  503. for ( ; state.lastJobIndex <= state.nextJobIndex && state.lastJobIndex < jobList.Num(); state.lastJobIndex++ ) {
  504. if ( jobList[state.lastJobIndex].data == & JOB_SIGNAL ) {
  505. state.signalIndex++;
  506. assert( state.signalIndex < signalJobCount.Num() );
  507. } else if ( jobList[state.lastJobIndex].data == & JOB_SYNCHRONIZE ) {
  508. assert( state.signalIndex > 0 );
  509. if ( signalJobCount[state.signalIndex - 1].GetValue() > 0 ) {
  510. // return this job to the list
  511. currentJob.Decrement();
  512. // release the fetch lock
  513. fetchLock.Decrement();
  514. // stalled on a synchronization point
  515. return ( result | RUN_STALLED );
  516. }
  517. } else if ( jobList[state.lastJobIndex].data == & JOB_LIST_DONE ) {
  518. if ( signalJobCount[signalJobCount.Num() - 1].GetValue() > 0 ) {
  519. // return this job to the list
  520. currentJob.Decrement();
  521. // release the fetch lock
  522. fetchLock.Decrement();
  523. // stalled on a synchronization point
  524. return ( result | RUN_STALLED );
  525. }
  526. // decrement the done count
  527. doneGuards[currentDoneGuard].Decrement();
  528. }
  529. }
  530. // release the fetch lock
  531. fetchLock.Decrement();
  532. } else {
  533. // release the fetch lock
  534. fetchLock.Decrement();
  535. // another thread is fetching right now so consider stalled
  536. return ( result | RUN_STALLED );
  537. }
  538. // if at the end of the job list we're done
  539. if ( state.nextJobIndex >= jobList.Num() ) {
  540. return ( result | RUN_DONE );
  541. }
  542. // execute the next job
  543. {
  544. uint64 jobStart = Sys_Microseconds();
  545. jobList[state.nextJobIndex].function( jobList[state.nextJobIndex].data );
  546. jobList[state.nextJobIndex].executed = 1;
  547. uint64 jobEnd = Sys_Microseconds();
  548. deferredThreadStats.threadExecTime[threadNum] += jobEnd - jobStart;
  549. #ifndef _DEBUG
  550. if ( jobs_longJobMicroSec.GetInteger() > 0 ) {
  551. if ( jobEnd - jobStart > jobs_longJobMicroSec.GetInteger()
  552. && GetId() != JOBLIST_UTILITY ) {
  553. longJobTime = ( jobEnd - jobStart ) * ( 1.0f / 1000.0f );
  554. longJobFunc = jobList[state.nextJobIndex].function;
  555. longJobData = jobList[state.nextJobIndex].data;
  556. const char * jobName = GetJobName( jobList[state.nextJobIndex].function );
  557. const char * jobListName = GetJobListName( GetId() );
  558. idLib::Printf( "%1.1f milliseconds for a single '%s' job from job list %s on thread %d\n", longJobTime, jobName, jobListName, threadNum );
  559. }
  560. }
  561. #endif
  562. }
  563. result |= RUN_PROGRESS;
  564. // decrease the job count for the current signal
  565. if ( signalJobCount[state.signalIndex].Decrement() == 0 ) {
  566. // if this was the very last job of the job list
  567. if ( state.signalIndex == signalJobCount.Num() - 1 ) {
  568. deferredThreadStats.endTime = Sys_Microseconds();
  569. return ( result | RUN_DONE );
  570. }
  571. }
  572. } while( ! singleJob );
  573. return result;
  574. }
  575. /*
  576. ========================
  577. idParallelJobList_Threads::RunJobs
  578. ========================
  579. */
  580. int idParallelJobList_Threads::RunJobs( unsigned int threadNum, threadJobListState_t & state, bool singleJob ) {
  581. uint64 start = Sys_Microseconds();
  582. numThreadsExecuting.Increment();
  583. int result = RunJobsInternal( threadNum, state, singleJob );
  584. numThreadsExecuting.Decrement();
  585. deferredThreadStats.threadTotalTime[threadNum] += Sys_Microseconds() - start;
  586. return result;
  587. }
  588. /*
  589. ========================
  590. idParallelJobList_Threads::WaitForOtherJobList
  591. ========================
  592. */
  593. bool idParallelJobList_Threads::WaitForOtherJobList() {
  594. if ( waitForGuard != NULL ) {
  595. if ( waitForGuard->GetValue() > 0 ) {
  596. return true;
  597. }
  598. }
  599. return false;
  600. }
  601. /*
  602. ================================================================================================
  603. idParallelJobList
  604. ================================================================================================
  605. */
  606. /*
  607. ========================
  608. idParallelJobList::idParallelJobList
  609. ========================
  610. */
  611. idParallelJobList::idParallelJobList( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs, const idColor * color ) {
  612. assert( priority > JOBLIST_PRIORITY_NONE );
  613. this->jobListThreads = new (TAG_JOBLIST) idParallelJobList_Threads( id, priority, maxJobs, maxSyncs );
  614. this->color = color;
  615. }
  616. /*
  617. ========================
  618. idParallelJobList::~idParallelJobList
  619. ========================
  620. */
  621. idParallelJobList::~idParallelJobList() {
  622. delete jobListThreads;
  623. }
  624. /*
  625. ========================
  626. idParallelJobList::AddJob
  627. ========================
  628. */
  629. void idParallelJobList::AddJob( jobRun_t function, void * data ) {
  630. assert( IsRegisteredJob( function ) );
  631. jobListThreads->AddJob( function, data );
  632. }
  633. /*
  634. ========================
  635. idParallelJobList::AddJobSPURS
  636. ========================
  637. */
  638. CellSpursJob128 * idParallelJobList::AddJobSPURS() {
  639. return NULL;
  640. }
  641. /*
  642. ========================
  643. idParallelJobList::InsertSyncPoint
  644. ========================
  645. */
  646. void idParallelJobList::InsertSyncPoint( jobSyncType_t syncType ) {
  647. jobListThreads->InsertSyncPoint( syncType );
  648. }
  649. /*
  650. ========================
  651. idParallelJobList::Wait
  652. ========================
  653. */
  654. void idParallelJobList::Wait() {
  655. if ( jobListThreads != NULL ) {
  656. jobListThreads->Wait();
  657. }
  658. }
  659. /*
  660. ========================
  661. idParallelJobList::TryWait
  662. ========================
  663. */
  664. bool idParallelJobList::TryWait() {
  665. bool done = true;
  666. if ( jobListThreads != NULL ) {
  667. done &= jobListThreads->TryWait();
  668. }
  669. return done;
  670. }
  671. /*
  672. ========================
  673. idParallelJobList::Submit
  674. ========================
  675. */
  676. void idParallelJobList::Submit( idParallelJobList * waitForJobList, int parallelism ) {
  677. assert( waitForJobList != this );
  678. jobListThreads->Submit( ( waitForJobList != NULL ) ? waitForJobList->jobListThreads : NULL, parallelism );
  679. }
  680. /*
  681. ========================
  682. idParallelJobList::IsSubmitted
  683. ========================
  684. */
  685. bool idParallelJobList::IsSubmitted() const {
  686. return jobListThreads->IsSubmitted();
  687. }
  688. /*
  689. ========================
  690. idParallelJobList::GetNumExecutedJobs
  691. ========================
  692. */
  693. unsigned int idParallelJobList::GetNumExecutedJobs() const {
  694. return jobListThreads->GetNumExecutedJobs();
  695. }
  696. /*
  697. ========================
  698. idParallelJobList::GetNumSyncs
  699. ========================
  700. */
  701. unsigned int idParallelJobList::GetNumSyncs() const {
  702. return jobListThreads->GetNumSyncs();
  703. }
  704. /*
  705. ========================
  706. idParallelJobList::GetSubmitTimeMicroSec
  707. ========================
  708. */
  709. uint64 idParallelJobList::GetSubmitTimeMicroSec() const {
  710. return jobListThreads->GetSubmitTimeMicroSec();
  711. }
  712. /*
  713. ========================
  714. idParallelJobList::GetStartTimeMicroSec
  715. ========================
  716. */
  717. uint64 idParallelJobList::GetStartTimeMicroSec() const {
  718. return jobListThreads->GetStartTimeMicroSec();
  719. }
  720. /*
  721. ========================
  722. idParallelJobList::GetFinishTimeMicroSec
  723. ========================
  724. */
  725. uint64 idParallelJobList::GetFinishTimeMicroSec() const {
  726. return jobListThreads->GetFinishTimeMicroSec();
  727. }
  728. /*
  729. ========================
  730. idParallelJobList::GetWaitTimeMicroSec
  731. ========================
  732. */
  733. uint64 idParallelJobList::GetWaitTimeMicroSec() const {
  734. return jobListThreads->GetWaitTimeMicroSec();
  735. }
  736. /*
  737. ========================
  738. idParallelJobList::GetTotalProcessingTimeMicroSec
  739. ========================
  740. */
  741. uint64 idParallelJobList::GetTotalProcessingTimeMicroSec() const {
  742. return jobListThreads->GetTotalProcessingTimeMicroSec();
  743. }
  744. /*
  745. ========================
  746. idParallelJobList::GetTotalWastedTimeMicroSec
  747. ========================
  748. */
  749. uint64 idParallelJobList::GetTotalWastedTimeMicroSec() const {
  750. return jobListThreads->GetTotalWastedTimeMicroSec();
  751. }
  752. /*
  753. ========================
  754. idParallelJobList::GetUnitProcessingTimeMicroSec
  755. ========================
  756. */
  757. uint64 idParallelJobList::GetUnitProcessingTimeMicroSec( int unit ) const {
  758. return jobListThreads->GetUnitProcessingTimeMicroSec( unit );
  759. }
  760. /*
  761. ========================
  762. idParallelJobList::GetUnitWastedTimeMicroSec
  763. ========================
  764. */
  765. uint64 idParallelJobList::GetUnitWastedTimeMicroSec( int unit ) const {
  766. return jobListThreads->GetUnitWastedTimeMicroSec( unit );
  767. }
  768. /*
  769. ========================
  770. idParallelJobList::GetId
  771. ========================
  772. */
  773. jobListId_t idParallelJobList::GetId() const {
  774. return jobListThreads->GetId();
  775. }
  776. /*
  777. ================================================================================================
  778. idJobThread
  779. ================================================================================================
  780. */
  781. const int JOB_THREAD_STACK_SIZE = 256 * 1024; // same size as the SPU local store
  782. struct threadJobList_t {
  783. idParallelJobList_Threads * jobList;
  784. int version;
  785. };
  786. static idCVar jobs_prioritize( "jobs_prioritize", "1", CVAR_BOOL | CVAR_NOCHEAT, "prioritize job lists" );
  787. class idJobThread : public idSysThread {
  788. public:
  789. idJobThread();
  790. ~idJobThread();
  791. void Start( core_t core, unsigned int threadNum );
  792. void AddJobList( idParallelJobList_Threads * jobList );
  793. private:
  794. threadJobList_t jobLists[MAX_JOBLISTS]; // cyclic buffer with job lists
  795. unsigned int firstJobList; // index of the last job list the thread grabbed
  796. unsigned int lastJobList; // index where the next job list to work on will be added
  797. idSysMutex addJobMutex;
  798. unsigned int threadNum;
  799. virtual int Run();
  800. };
  801. /*
  802. ========================
  803. idJobThread::idJobThread
  804. ========================
  805. */
  806. idJobThread::idJobThread() :
  807. firstJobList( 0 ),
  808. lastJobList( 0 ),
  809. threadNum( 0 ) {
  810. }
  811. /*
  812. ========================
  813. idJobThread::~idJobThread
  814. ========================
  815. */
  816. idJobThread::~idJobThread() {
  817. }
  818. /*
  819. ========================
  820. idJobThread::Start
  821. ========================
  822. */
  823. void idJobThread::Start( core_t core, unsigned int threadNum ) {
  824. this->threadNum = threadNum;
  825. StartWorkerThread( va( "JobListProcessor_%d", threadNum ), core, THREAD_NORMAL, JOB_THREAD_STACK_SIZE );
  826. }
  827. /*
  828. ========================
  829. idJobThread::AddJobList
  830. ========================
  831. */
  832. void idJobThread::AddJobList( idParallelJobList_Threads * jobList ) {
  833. // must lock because multiple threads may try to add new job lists at the same time
  834. addJobMutex.Lock();
  835. // wait until there is space available because in rare cases multiple versions of the same job lists may still be queued
  836. while( lastJobList - firstJobList >= MAX_JOBLISTS ) {
  837. Sys_Yield();
  838. }
  839. assert( lastJobList - firstJobList < MAX_JOBLISTS );
  840. jobLists[lastJobList & ( MAX_JOBLISTS - 1 )].jobList = jobList;
  841. jobLists[lastJobList & ( MAX_JOBLISTS - 1 )].version = jobList->GetVersion();
  842. lastJobList++;
  843. addJobMutex.Unlock();
  844. }
  845. /*
  846. ========================
  847. idJobThread::Run
  848. ========================
  849. */
  850. int idJobThread::Run() {
  851. threadJobListState_t threadJobListState[MAX_JOBLISTS];
  852. int numJobLists = 0;
  853. int lastStalledJobList = -1;
  854. while ( !IsTerminating() ) {
  855. // fetch any new job lists and add them to the local list
  856. if ( numJobLists < MAX_JOBLISTS && firstJobList < lastJobList ) {
  857. threadJobListState[numJobLists].jobList = jobLists[firstJobList & ( MAX_JOBLISTS - 1 )].jobList;
  858. threadJobListState[numJobLists].version = jobLists[firstJobList & ( MAX_JOBLISTS - 1 )].version;
  859. threadJobListState[numJobLists].signalIndex = 0;
  860. threadJobListState[numJobLists].lastJobIndex = 0;
  861. threadJobListState[numJobLists].nextJobIndex = -1;
  862. numJobLists++;
  863. firstJobList++;
  864. }
  865. if ( numJobLists == 0 ) {
  866. break;
  867. }
  868. int currentJobList = 0;
  869. jobListPriority_t priority = JOBLIST_PRIORITY_NONE;
  870. if ( lastStalledJobList < 0 ) {
  871. // find the job list with the highest priority
  872. for ( int i = 0; i < numJobLists; i++ ) {
  873. if ( threadJobListState[i].jobList->GetPriority() > priority && !threadJobListState[i].jobList->WaitForOtherJobList() ) {
  874. priority = threadJobListState[i].jobList->GetPriority();
  875. currentJobList = i;
  876. }
  877. }
  878. } else {
  879. // try to hide the stall with a job from a list that has equal or higher priority
  880. currentJobList = lastStalledJobList;
  881. priority = threadJobListState[lastStalledJobList].jobList->GetPriority();
  882. for ( int i = 0; i < numJobLists; i++ ) {
  883. if ( i != lastStalledJobList && threadJobListState[i].jobList->GetPriority() >= priority && !threadJobListState[i].jobList->WaitForOtherJobList() ) {
  884. priority = threadJobListState[i].jobList->GetPriority();
  885. currentJobList = i;
  886. }
  887. }
  888. }
  889. // if the priority is high then try to run through the whole list to reduce the overhead
  890. // otherwise run a single job and re-evaluate priorities for the next job
  891. bool singleJob = ( priority == JOBLIST_PRIORITY_HIGH ) ? false : jobs_prioritize.GetBool();
  892. // try running one or more jobs from the current job list
  893. int result = threadJobListState[currentJobList].jobList->RunJobs( threadNum, threadJobListState[currentJobList], singleJob );
  894. if ( ( result & idParallelJobList_Threads::RUN_DONE ) != 0 ) {
  895. // done with this job list so remove it from the local list
  896. for ( int i = currentJobList; i < numJobLists - 1; i++ ) {
  897. threadJobListState[i] = threadJobListState[i + 1];
  898. }
  899. numJobLists--;
  900. lastStalledJobList = -1;
  901. } else if ( ( result & idParallelJobList_Threads::RUN_STALLED ) != 0 ) {
  902. // yield when stalled on the same job list again without making any progress
  903. if ( currentJobList == lastStalledJobList ) {
  904. if ( ( result & idParallelJobList_Threads::RUN_PROGRESS ) == 0 ) {
  905. Sys_Yield();
  906. }
  907. }
  908. lastStalledJobList = currentJobList;
  909. } else {
  910. lastStalledJobList = -1;
  911. }
  912. }
  913. return 0;
  914. }
  915. /*
  916. ================================================================================================
  917. idParallelJobManagerLocal
  918. ================================================================================================
  919. */
  920. extern void Sys_CPUCount( int & logicalNum, int & coreNum, int & packageNum );
  921. // WINDOWS LOGICAL PROCESSOR LIMITS:
  922. //
  923. // http://download.microsoft.com/download/5/7/7/577a5684-8a83-43ae-9272-ff260a9c20e2/Hyper-thread_Windows.doc
  924. //
  925. // Physical Logical (Cores + HT)
  926. // Windows XP Home Edition 1 2
  927. // Windows XP Professional 2 4
  928. // Windows Server 2003, Standard Edition 4 8
  929. // Windows Server 2003, Enterprise Edition 8 16
  930. // Windows Server 2003, Datacenter Edition 32 32
  931. //
  932. // Windows Vista ? ?
  933. //
  934. // Windows 7 Starter 1 32/64
  935. // Windows 7 Home Basic 1 32/64
  936. // Windows 7 Professional 2 32/64
  937. //
  938. //
  939. // Hyperthreading is not dead yet. Intel's Core i7 Processor is quad-core with HT for 8 logicals.
  940. // DOOM3: We don't have that many jobs, so just set this fairly low so we don't spin up a ton of idle threads
  941. #define MAX_JOB_THREADS 2
  942. #define NUM_JOB_THREADS "2"
  943. #define JOB_THREAD_CORES { CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
  944. CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
  945. CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
  946. CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
  947. CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
  948. CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
  949. CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
  950. CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY }
  951. idCVar jobs_numThreads( "jobs_numThreads", NUM_JOB_THREADS, CVAR_INTEGER | CVAR_NOCHEAT, "number of threads used to crunch through jobs", 0, MAX_JOB_THREADS );
  952. class idParallelJobManagerLocal : public idParallelJobManager {
  953. public:
  954. virtual ~idParallelJobManagerLocal() {}
  955. virtual void Init();
  956. virtual void Shutdown();
  957. virtual idParallelJobList * AllocJobList( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs, const idColor * color );
  958. virtual void FreeJobList( idParallelJobList * jobList );
  959. virtual int GetNumJobLists() const;
  960. virtual int GetNumFreeJobLists() const;
  961. virtual idParallelJobList * GetJobList( int index );
  962. virtual int GetNumProcessingUnits();
  963. virtual void WaitForAllJobLists();
  964. void Submit( idParallelJobList_Threads * jobList, int parallelism );
  965. private:
  966. idJobThread threads[MAX_JOB_THREADS];
  967. unsigned int maxThreads;
  968. int numPhysicalCpuCores;
  969. int numLogicalCpuCores;
  970. int numCpuPackages;
  971. idStaticList< idParallelJobList *, MAX_JOBLISTS > jobLists;
  972. };
  973. idParallelJobManagerLocal parallelJobManagerLocal;
  974. idParallelJobManager * parallelJobManager = &parallelJobManagerLocal;
  975. /*
  976. ========================
  977. SubmitJobList
  978. ========================
  979. */
  980. void SubmitJobList( idParallelJobList_Threads * jobList, int parallelism ) {
  981. parallelJobManagerLocal.Submit( jobList, parallelism );
  982. }
  983. /*
  984. ========================
  985. idParallelJobManagerLocal::Init
  986. ========================
  987. */
  988. void idParallelJobManagerLocal::Init() {
  989. // on consoles this will have specific cores for the threads, but on PC they will all be CORE_ANY
  990. core_t cores[] = JOB_THREAD_CORES;
  991. assert( sizeof( cores ) / sizeof( cores[0] ) >= MAX_JOB_THREADS );
  992. for ( int i = 0; i < MAX_JOB_THREADS; i++ ) {
  993. threads[i].Start( cores[i], i );
  994. }
  995. maxThreads = jobs_numThreads.GetInteger();
  996. Sys_CPUCount( numPhysicalCpuCores, numLogicalCpuCores, numCpuPackages );
  997. }
  998. /*
  999. ========================
  1000. idParallelJobManagerLocal::Shutdown
  1001. ========================
  1002. */
  1003. void idParallelJobManagerLocal::Shutdown() {
  1004. for ( int i = 0; i < MAX_JOB_THREADS; i++ ) {
  1005. threads[i].StopThread();
  1006. }
  1007. }
  1008. /*
  1009. ========================
  1010. idParallelJobManagerLocal::AllocJobList
  1011. ========================
  1012. */
  1013. idParallelJobList * idParallelJobManagerLocal::AllocJobList( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs, const idColor * color ) {
  1014. for ( int i = 0; i < jobLists.Num(); i++ ) {
  1015. if ( jobLists[i]->GetId() == id ) {
  1016. // idStudio may cause job lists to be allocated multiple times
  1017. }
  1018. }
  1019. idParallelJobList * jobList = new (TAG_JOBLIST) idParallelJobList( id, priority, maxJobs, maxSyncs, color );
  1020. jobLists.Append( jobList );
  1021. return jobList;
  1022. }
  1023. /*
  1024. ========================
  1025. idParallelJobManagerLocal::FreeJobList
  1026. ========================
  1027. */
  1028. void idParallelJobManagerLocal::FreeJobList( idParallelJobList * jobList ) {
  1029. if ( jobList == NULL ) {
  1030. return;
  1031. }
  1032. // wait for all job threads to finish because job list deletion is not thread safe
  1033. for ( unsigned int i = 0; i < maxThreads; i++ ) {
  1034. threads[i].WaitForThread();
  1035. }
  1036. int index = jobLists.FindIndex( jobList );
  1037. assert( index >= 0 && jobLists[index] == jobList );
  1038. jobLists[index]->Wait();
  1039. delete jobLists[index];
  1040. jobLists.RemoveIndexFast( index );
  1041. }
  1042. /*
  1043. ========================
  1044. idParallelJobManagerLocal::GetNumJobLists
  1045. ========================
  1046. */
  1047. int idParallelJobManagerLocal::GetNumJobLists() const {
  1048. return jobLists.Num();
  1049. }
  1050. /*
  1051. ========================
  1052. idParallelJobManagerLocal::GetNumFreeJobLists
  1053. ========================
  1054. */
  1055. int idParallelJobManagerLocal::GetNumFreeJobLists() const {
  1056. return MAX_JOBLISTS - jobLists.Num();
  1057. }
  1058. /*
  1059. ========================
  1060. idParallelJobManagerLocal::GetJobList
  1061. ========================
  1062. */
  1063. idParallelJobList * idParallelJobManagerLocal::GetJobList( int index ) {
  1064. return jobLists[index];
  1065. }
  1066. /*
  1067. ========================
  1068. idParallelJobManagerLocal::GetNumProcessingUnits
  1069. ========================
  1070. */
  1071. int idParallelJobManagerLocal::GetNumProcessingUnits() {
  1072. return maxThreads;
  1073. }
  1074. /*
  1075. ========================
  1076. idParallelJobManagerLocal::WaitForAllJobLists
  1077. ========================
  1078. */
  1079. void idParallelJobManagerLocal::WaitForAllJobLists() {
  1080. // wait for all job lists to complete
  1081. for ( int i = 0; i < jobLists.Num(); i++ ) {
  1082. jobLists[i]->Wait();
  1083. }
  1084. }
  1085. /*
  1086. ========================
  1087. idParallelJobManagerLocal::Submit
  1088. ========================
  1089. */
  1090. void idParallelJobManagerLocal::Submit( idParallelJobList_Threads * jobList, int parallelism ) {
  1091. if ( jobs_numThreads.IsModified() ) {
  1092. maxThreads = idMath::ClampInt( 0, MAX_JOB_THREADS, jobs_numThreads.GetInteger() );
  1093. jobs_numThreads.ClearModified();
  1094. }
  1095. // determine the number of threads to use
  1096. int numThreads = maxThreads;
  1097. if ( parallelism == JOBLIST_PARALLELISM_DEFAULT ) {
  1098. numThreads = maxThreads;
  1099. } else if ( parallelism == JOBLIST_PARALLELISM_MAX_CORES ) {
  1100. numThreads = numLogicalCpuCores;
  1101. } else if ( parallelism == JOBLIST_PARALLELISM_MAX_THREADS ) {
  1102. numThreads = MAX_JOB_THREADS;
  1103. } else if ( parallelism > MAX_JOB_THREADS ) {
  1104. numThreads = MAX_JOB_THREADS;
  1105. } else {
  1106. numThreads = parallelism;
  1107. }
  1108. if ( numThreads <= 0 ) {
  1109. threadJobListState_t state( jobList->GetVersion() );
  1110. jobList->RunJobs( 0, state, false );
  1111. return;
  1112. }
  1113. for ( int i = 0; i < numThreads; i++ ) {
  1114. threads[i].AddJobList( jobList );
  1115. threads[i].SignalWork();
  1116. }
  1117. }