1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297 |
- /*
- ===========================================================================
- Doom 3 BFG Edition GPL Source Code
- Copyright (C) 1993-2012 id Software LLC, a ZeniMax Media company.
- This file is part of the Doom 3 BFG Edition GPL Source Code ("Doom 3 BFG Edition Source Code").
- Doom 3 BFG Edition Source Code is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- Doom 3 BFG Edition Source Code is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with Doom 3 BFG Edition Source Code. If not, see <http://www.gnu.org/licenses/>.
- In addition, the Doom 3 BFG Edition Source Code is also subject to certain additional terms. You should have received a copy of these additional terms immediately following the terms and conditions of the GNU General Public License which accompanied the Doom 3 BFG Edition Source Code. If not, please request a copy in writing from id Software at the address below.
- If you have questions concerning this license or the applicable additional terms, you may contact in writing id Software LLC, c/o ZeniMax Media Inc., Suite 120, Rockville, Maryland 20850 USA.
- ===========================================================================
- */
- #pragma hdrstop
- #include "precompiled.h"
- #include "ParallelJobList.h"
- /*
- ================================================================================================
- Job and Job-List names
- ================================================================================================
- */
- const char * jobNames[] = {
- ASSERT_ENUM_STRING( JOBLIST_RENDERER_FRONTEND, 0 ),
- ASSERT_ENUM_STRING( JOBLIST_RENDERER_BACKEND, 1 ),
- ASSERT_ENUM_STRING( JOBLIST_UTILITY, 9 ),
- };
- static const int MAX_REGISTERED_JOBS = 128;
- struct registeredJob {
- jobRun_t function;
- const char * name;
- } registeredJobs[MAX_REGISTERED_JOBS];
- static int numRegisteredJobs;
- const char * GetJobListName( jobListId_t id ) {
- return jobNames[id];
- }
- /*
- ========================
- IsRegisteredJob
- ========================
- */
- static bool IsRegisteredJob( jobRun_t function ) {
- for ( int i = 0; i < numRegisteredJobs; i++ ) {
- if ( registeredJobs[i].function == function ) {
- return true;
- }
- }
- return false;
- }
- /*
- ========================
- RegisterJob
- ========================
- */
- void RegisterJob( jobRun_t function, const char * name ) {
- if ( IsRegisteredJob( function ) ) {
- return;
- }
- registeredJobs[numRegisteredJobs].function = function;
- registeredJobs[numRegisteredJobs].name = name;
- numRegisteredJobs++;
- }
- /*
- ========================
- GetJobName
- ========================
- */
- const char * GetJobName( jobRun_t function ) {
- for ( int i = 0; i < numRegisteredJobs; i++ ) {
- if ( registeredJobs[i].function == function ) {
- return registeredJobs[i].name;
- }
- }
- return "unknown";
- }
- /*
- ========================
- idParallelJobRegistration::idParallelJobRegistration
- ========================
- */
- idParallelJobRegistration::idParallelJobRegistration( jobRun_t function, const char * name ) {
- RegisterJob( function, name );
- }
- int globalSpuLocalStoreActive;
- void * globalSpuLocalStoreStart;
- void * globalSpuLocalStoreEnd;
- idSysMutex globalSpuLocalStoreMutex;
- /*
- ================================================================================================
- PS3
- ================================================================================================
- */
- /*
- ================================================================================================
- idParallelJobList_Threads
- ================================================================================================
- */
- static idCVar jobs_longJobMicroSec( "jobs_longJobMicroSec", "10000", CVAR_INTEGER, "print a warning for jobs that take more than this number of microseconds" );
- const static int MAX_THREADS = 32;
- struct threadJobListState_t {
- threadJobListState_t() :
- jobList( NULL ),
- version( 0xFFFFFFFF ),
- signalIndex( 0 ),
- lastJobIndex( 0 ),
- nextJobIndex( -1 ) {}
- threadJobListState_t( int _version ) :
- jobList( NULL ),
- version( _version ),
- signalIndex( 0 ),
- lastJobIndex( 0 ),
- nextJobIndex( -1 ) {}
- idParallelJobList_Threads * jobList;
- int version;
- int signalIndex;
- int lastJobIndex;
- int nextJobIndex;
- };
- struct threadStats_t {
- unsigned int numExecutedJobs;
- unsigned int numExecutedSyncs;
- uint64 submitTime;
- uint64 startTime;
- uint64 endTime;
- uint64 waitTime;
- uint64 threadExecTime[MAX_THREADS];
- uint64 threadTotalTime[MAX_THREADS];
- };
- class idParallelJobList_Threads {
- public:
- idParallelJobList_Threads( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs );
- ~idParallelJobList_Threads();
- //------------------------
- // These are called from the one thread that manages this list.
- //------------------------
- ID_INLINE void AddJob( jobRun_t function, void * data );
- ID_INLINE void InsertSyncPoint( jobSyncType_t syncType );
- void Submit( idParallelJobList_Threads * waitForJobList_, int parallelism );
- void Wait();
- bool TryWait();
- bool IsSubmitted() const;
- unsigned int GetNumExecutedJobs() const { return threadStats.numExecutedJobs; }
- unsigned int GetNumSyncs() const { return threadStats.numExecutedSyncs; }
- uint64 GetSubmitTimeMicroSec() const { return threadStats.submitTime; }
- uint64 GetStartTimeMicroSec() const { return threadStats.startTime; }
- uint64 GetFinishTimeMicroSec() const { return threadStats.endTime; }
- uint64 GetWaitTimeMicroSec() const { return threadStats.waitTime; }
- uint64 GetTotalProcessingTimeMicroSec() const;
- uint64 GetTotalWastedTimeMicroSec() const;
- uint64 GetUnitProcessingTimeMicroSec( int unit ) const;
- uint64 GetUnitWastedTimeMicroSec( int unit ) const;
- jobListId_t GetId() const { return listId; }
- jobListPriority_t GetPriority() const { return listPriority; }
- int GetVersion() { return version.GetValue(); }
- bool WaitForOtherJobList();
- //------------------------
- // This is thread safe and called from the job threads.
- //------------------------
- enum runResult_t {
- RUN_OK = 0,
- RUN_PROGRESS = BIT( 0 ),
- RUN_DONE = BIT( 1 ),
- RUN_STALLED = BIT( 2 )
- };
- int RunJobs( unsigned int threadNum, threadJobListState_t & state, bool singleJob );
- private:
- static const int NUM_DONE_GUARDS = 4; // cycle through 4 guards so we can cyclicly chain job lists
- bool threaded;
- bool done;
- bool hasSignal;
- jobListId_t listId;
- jobListPriority_t listPriority;
- unsigned int maxJobs;
- unsigned int maxSyncs;
- unsigned int numSyncs;
- int lastSignalJob;
- idSysInterlockedInteger * waitForGuard;
- idSysInterlockedInteger doneGuards[NUM_DONE_GUARDS];
- int currentDoneGuard;
- idSysInterlockedInteger version;
- struct job_t {
- jobRun_t function;
- void * data;
- int executed;
- };
- idList< job_t, TAG_JOBLIST > jobList;
- idList< idSysInterlockedInteger, TAG_JOBLIST > signalJobCount;
- idSysInterlockedInteger currentJob;
- idSysInterlockedInteger fetchLock;
- idSysInterlockedInteger numThreadsExecuting;
- threadStats_t deferredThreadStats;
- threadStats_t threadStats;
- int RunJobsInternal( unsigned int threadNum, threadJobListState_t & state, bool singleJob );
- static void Nop( void * data ) {}
- static int JOB_SIGNAL;
- static int JOB_SYNCHRONIZE;
- static int JOB_LIST_DONE;
- };
- int idParallelJobList_Threads::JOB_SIGNAL;
- int idParallelJobList_Threads::JOB_SYNCHRONIZE;
- int idParallelJobList_Threads::JOB_LIST_DONE;
- /*
- ========================
- idParallelJobList_Threads::idParallelJobList_Threads
- ========================
- */
- idParallelJobList_Threads::idParallelJobList_Threads( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs ) :
- threaded( true ),
- done( true ),
- hasSignal( false ),
- listId( id ),
- listPriority( priority ),
- numSyncs( 0 ),
- lastSignalJob( 0 ),
- waitForGuard( NULL ),
- currentDoneGuard( 0 ),
- jobList() {
- assert( listPriority != JOBLIST_PRIORITY_NONE );
- this->maxJobs = maxJobs;
- this->maxSyncs = maxSyncs;
- jobList.AssureSize( maxJobs + maxSyncs * 2 + 1 ); // syncs go in as dummy jobs and one more to update the doneCount
- jobList.SetNum( 0 );
- signalJobCount.AssureSize( maxSyncs + 1 ); // need one extra for submit
- signalJobCount.SetNum( 0 );
- memset( &deferredThreadStats, 0, sizeof( threadStats_t ) );
- memset( &threadStats, 0, sizeof( threadStats_t ) );
- }
- /*
- ========================
- idParallelJobList_Threads::~idParallelJobList_Threads
- ========================
- */
- idParallelJobList_Threads::~idParallelJobList_Threads() {
- Wait();
- }
- /*
- ========================
- idParallelJobList_Threads::AddJob
- ========================
- */
- ID_INLINE void idParallelJobList_Threads::AddJob( jobRun_t function, void * data ) {
- assert( done );
- #if defined( _DEBUG )
- // make sure there isn't already a job with the same function and data in the list
- if ( jobList.Num() < 1000 ) { // don't do this N^2 slow check on big lists
- for ( int i = 0; i < jobList.Num(); i++ ) {
- assert( jobList[i].function != function || jobList[i].data != data );
- }
- }
- #endif
- if ( 1 ) { // JDC: this never worked in tech5! !jobList.IsFull() ) {
- job_t & job = jobList.Alloc();
- job.function = function;
- job.data = data;
- job.executed = 0;
- } else {
- // debug output to show us what is overflowing
- int currentJobCount[MAX_REGISTERED_JOBS] = {};
-
- for ( int i = 0; i < jobList.Num(); ++i ) {
- const char * jobName = GetJobName( jobList[ i ].function );
- for ( int j = 0; j < numRegisteredJobs; ++j ) {
- if ( jobName == registeredJobs[ j ].name ) {
- currentJobCount[ j ]++;
- break;
- }
- }
- }
- // print the quantity of each job type
- for ( int i = 0; i < numRegisteredJobs; ++i ) {
- if ( currentJobCount[ i ] > 0 ) {
- idLib::Printf( "Job: %s, # %d", registeredJobs[ i ].name, currentJobCount[ i ] );
- }
- }
- idLib::Error( "Can't add job '%s', too many jobs %d", GetJobName( function ), jobList.Num() );
- }
- }
- /*
- ========================
- idParallelJobList_Threads::InsertSyncPoint
- ========================
- */
- ID_INLINE void idParallelJobList_Threads::InsertSyncPoint( jobSyncType_t syncType ) {
- assert( done );
- switch( syncType ) {
- case SYNC_SIGNAL: {
- assert( !hasSignal );
- if ( jobList.Num() ) {
- assert( !hasSignal );
- signalJobCount.Alloc();
- signalJobCount[signalJobCount.Num() - 1].SetValue( jobList.Num() - lastSignalJob );
- lastSignalJob = jobList.Num();
- job_t & job = jobList.Alloc();
- job.function = Nop;
- job.data = & JOB_SIGNAL;
- hasSignal = true;
- }
- break;
- }
- case SYNC_SYNCHRONIZE: {
- if ( hasSignal ) {
- job_t & job = jobList.Alloc();
- job.function = Nop;
- job.data = & JOB_SYNCHRONIZE;
- hasSignal = false;
- numSyncs++;
- }
- break;
- }
- }
- }
- /*
- ========================
- idParallelJobList_Threads::Submit
- ========================
- */
- void idParallelJobList_Threads::Submit( idParallelJobList_Threads * waitForJobList, int parallelism ) {
- assert( done );
- assert( numSyncs <= maxSyncs );
- assert( (unsigned int) jobList.Num() <= maxJobs + numSyncs * 2 );
- assert( fetchLock.GetValue() == 0 );
- done = false;
- currentJob.SetValue( 0 );
- memset( &deferredThreadStats, 0, sizeof( deferredThreadStats ) );
- deferredThreadStats.numExecutedJobs = jobList.Num() - numSyncs * 2;
- deferredThreadStats.numExecutedSyncs = numSyncs;
- deferredThreadStats.submitTime = Sys_Microseconds();
- deferredThreadStats.startTime = 0;
- deferredThreadStats.endTime = 0;
- deferredThreadStats.waitTime = 0;
- if ( jobList.Num() == 0 ) {
- return;
- }
- if ( waitForJobList != NULL ) {
- waitForGuard = & waitForJobList->doneGuards[waitForJobList->currentDoneGuard];
- } else {
- waitForGuard = NULL;
- }
- currentDoneGuard = ( currentDoneGuard + 1 ) & ( NUM_DONE_GUARDS - 1 );
- doneGuards[currentDoneGuard].SetValue( 1 );
- signalJobCount.Alloc();
- signalJobCount[signalJobCount.Num() - 1].SetValue( jobList.Num() - lastSignalJob );
- job_t & job = jobList.Alloc();
- job.function = Nop;
- job.data = & JOB_LIST_DONE;
- if ( threaded ) {
- // hand over to the manager
- void SubmitJobList( idParallelJobList_Threads * jobList, int parallelism );
- SubmitJobList( this, parallelism );
- } else {
- // run all the jobs right here
- threadJobListState_t state( GetVersion() );
- RunJobs( 0, state, false );
- }
- }
- /*
- ========================
- idParallelJobList_Threads::Wait
- ========================
- */
- void idParallelJobList_Threads::Wait() {
- if ( jobList.Num() > 0 ) {
- // don't lock up but return if the job list was never properly submitted
- if ( !verify( !done && signalJobCount.Num() > 0 ) ) {
- return;
- }
- bool waited = false;
- uint64 waitStart = Sys_Microseconds();
- while ( signalJobCount[signalJobCount.Num() - 1].GetValue() > 0 ) {
- Sys_Yield();
- waited = true;
- }
- version.Increment();
- while ( numThreadsExecuting.GetValue() > 0 ) {
- Sys_Yield();
- waited = true;
- }
- jobList.SetNum( 0 );
- signalJobCount.SetNum( 0 );
- numSyncs = 0;
- lastSignalJob = 0;
- uint64 waitEnd = Sys_Microseconds();
- deferredThreadStats.waitTime = waited ? ( waitEnd - waitStart ) : 0;
- }
- memcpy( & threadStats, & deferredThreadStats, sizeof( threadStats ) );
- done = true;
- }
- /*
- ========================
- idParallelJobList_Threads::TryWait
- ========================
- */
- bool idParallelJobList_Threads::TryWait() {
- if ( jobList.Num() == 0 || signalJobCount[signalJobCount.Num() - 1].GetValue() <= 0 ) {
- Wait();
- return true;
- }
- return false;
- }
- /*
- ========================
- idParallelJobList_Threads::IsSubmitted
- ========================
- */
- bool idParallelJobList_Threads::IsSubmitted() const {
- return !done;
- }
- /*
- ========================
- idParallelJobList_Threads::GetTotalProcessingTimeMicroSec
- ========================
- */
- uint64 idParallelJobList_Threads::GetTotalProcessingTimeMicroSec() const {
- uint64 total = 0;
- for ( int unit = 0; unit < MAX_THREADS; unit++ ) {
- total += threadStats.threadExecTime[unit];
- }
- return total;
- }
- /*
- ========================
- idParallelJobList_Threads::GetTotalWastedTimeMicroSec
- ========================
- */
- uint64 idParallelJobList_Threads::GetTotalWastedTimeMicroSec() const {
- uint64 total = 0;
- for ( int unit = 0; unit < MAX_THREADS; unit++ ) {
- total += threadStats.threadTotalTime[unit] - threadStats.threadExecTime[unit];
- }
- return total;
- }
- /*
- ========================
- idParallelJobList_Threads::GetUnitProcessingTimeMicroSec
- ========================
- */
- uint64 idParallelJobList_Threads::GetUnitProcessingTimeMicroSec( int unit ) const {
- if ( unit < 0 || unit >= MAX_THREADS ) {
- return 0;
- }
- return threadStats.threadExecTime[unit];
- }
- /*
- ========================
- idParallelJobList_Threads::GetUnitWastedTimeMicroSec
- ========================
- */
- uint64 idParallelJobList_Threads::GetUnitWastedTimeMicroSec( int unit ) const {
- if ( unit < 0 || unit >= MAX_THREADS ) {
- return 0;
- }
- return threadStats.threadTotalTime[unit] - threadStats.threadExecTime[unit];
- }
- #ifndef _DEBUG
- volatile float longJobTime;
- volatile jobRun_t longJobFunc;
- volatile void * longJobData;
- #endif
- /*
- ========================
- idParallelJobList_Threads::RunJobsInternal
- ========================
- */
- int idParallelJobList_Threads::RunJobsInternal( unsigned int threadNum, threadJobListState_t & state, bool singleJob ) {
- if ( state.version != version.GetValue() ) {
- // trying to run an old version of this list that is already done
- return RUN_DONE;
- }
- assert( threadNum < MAX_THREADS );
- if ( deferredThreadStats.startTime == 0 ) {
- deferredThreadStats.startTime = Sys_Microseconds(); // first time any thread is running jobs from this list
- }
- int result = RUN_OK;
- do {
- // run through all signals and syncs before the last job that has been or is being executed
- // this loop is really an optimization to minimize the time spent in the fetchLock section below
- for ( ; state.lastJobIndex < (int) currentJob.GetValue() && state.lastJobIndex < jobList.Num(); state.lastJobIndex++ ) {
- if ( jobList[state.lastJobIndex].data == & JOB_SIGNAL ) {
- state.signalIndex++;
- assert( state.signalIndex < signalJobCount.Num() );
- } else if ( jobList[state.lastJobIndex].data == & JOB_SYNCHRONIZE ) {
- assert( state.signalIndex > 0 );
- if ( signalJobCount[state.signalIndex - 1].GetValue() > 0 ) {
- // stalled on a synchronization point
- return ( result | RUN_STALLED );
- }
- } else if ( jobList[state.lastJobIndex].data == & JOB_LIST_DONE ) {
- if ( signalJobCount[signalJobCount.Num() - 1].GetValue() > 0 ) {
- // stalled on a synchronization point
- return ( result | RUN_STALLED );
- }
- }
- }
- // try to lock to fetch a new job
- if ( fetchLock.Increment() == 1 ) {
- // grab a new job
- state.nextJobIndex = currentJob.Increment() - 1;
- // run through any remaining signals and syncs (this should rarely iterate more than once)
- for ( ; state.lastJobIndex <= state.nextJobIndex && state.lastJobIndex < jobList.Num(); state.lastJobIndex++ ) {
- if ( jobList[state.lastJobIndex].data == & JOB_SIGNAL ) {
- state.signalIndex++;
- assert( state.signalIndex < signalJobCount.Num() );
- } else if ( jobList[state.lastJobIndex].data == & JOB_SYNCHRONIZE ) {
- assert( state.signalIndex > 0 );
- if ( signalJobCount[state.signalIndex - 1].GetValue() > 0 ) {
- // return this job to the list
- currentJob.Decrement();
- // release the fetch lock
- fetchLock.Decrement();
- // stalled on a synchronization point
- return ( result | RUN_STALLED );
- }
- } else if ( jobList[state.lastJobIndex].data == & JOB_LIST_DONE ) {
- if ( signalJobCount[signalJobCount.Num() - 1].GetValue() > 0 ) {
- // return this job to the list
- currentJob.Decrement();
- // release the fetch lock
- fetchLock.Decrement();
- // stalled on a synchronization point
- return ( result | RUN_STALLED );
- }
- // decrement the done count
- doneGuards[currentDoneGuard].Decrement();
- }
- }
- // release the fetch lock
- fetchLock.Decrement();
- } else {
- // release the fetch lock
- fetchLock.Decrement();
- // another thread is fetching right now so consider stalled
- return ( result | RUN_STALLED );
- }
- // if at the end of the job list we're done
- if ( state.nextJobIndex >= jobList.Num() ) {
- return ( result | RUN_DONE );
- }
- // execute the next job
- {
- uint64 jobStart = Sys_Microseconds();
- jobList[state.nextJobIndex].function( jobList[state.nextJobIndex].data );
- jobList[state.nextJobIndex].executed = 1;
- uint64 jobEnd = Sys_Microseconds();
- deferredThreadStats.threadExecTime[threadNum] += jobEnd - jobStart;
- #ifndef _DEBUG
- if ( jobs_longJobMicroSec.GetInteger() > 0 ) {
- if ( jobEnd - jobStart > jobs_longJobMicroSec.GetInteger()
- && GetId() != JOBLIST_UTILITY ) {
- longJobTime = ( jobEnd - jobStart ) * ( 1.0f / 1000.0f );
- longJobFunc = jobList[state.nextJobIndex].function;
- longJobData = jobList[state.nextJobIndex].data;
- const char * jobName = GetJobName( jobList[state.nextJobIndex].function );
- const char * jobListName = GetJobListName( GetId() );
- idLib::Printf( "%1.1f milliseconds for a single '%s' job from job list %s on thread %d\n", longJobTime, jobName, jobListName, threadNum );
- }
- }
- #endif
- }
- result |= RUN_PROGRESS;
- // decrease the job count for the current signal
- if ( signalJobCount[state.signalIndex].Decrement() == 0 ) {
- // if this was the very last job of the job list
- if ( state.signalIndex == signalJobCount.Num() - 1 ) {
- deferredThreadStats.endTime = Sys_Microseconds();
- return ( result | RUN_DONE );
- }
- }
- } while( ! singleJob );
- return result;
- }
- /*
- ========================
- idParallelJobList_Threads::RunJobs
- ========================
- */
- int idParallelJobList_Threads::RunJobs( unsigned int threadNum, threadJobListState_t & state, bool singleJob ) {
- uint64 start = Sys_Microseconds();
- numThreadsExecuting.Increment();
- int result = RunJobsInternal( threadNum, state, singleJob );
- numThreadsExecuting.Decrement();
- deferredThreadStats.threadTotalTime[threadNum] += Sys_Microseconds() - start;
- return result;
- }
- /*
- ========================
- idParallelJobList_Threads::WaitForOtherJobList
- ========================
- */
- bool idParallelJobList_Threads::WaitForOtherJobList() {
- if ( waitForGuard != NULL ) {
- if ( waitForGuard->GetValue() > 0 ) {
- return true;
- }
- }
- return false;
- }
- /*
- ================================================================================================
- idParallelJobList
- ================================================================================================
- */
- /*
- ========================
- idParallelJobList::idParallelJobList
- ========================
- */
- idParallelJobList::idParallelJobList( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs, const idColor * color ) {
- assert( priority > JOBLIST_PRIORITY_NONE );
- this->jobListThreads = new (TAG_JOBLIST) idParallelJobList_Threads( id, priority, maxJobs, maxSyncs );
- this->color = color;
- }
- /*
- ========================
- idParallelJobList::~idParallelJobList
- ========================
- */
- idParallelJobList::~idParallelJobList() {
- delete jobListThreads;
- }
- /*
- ========================
- idParallelJobList::AddJob
- ========================
- */
- void idParallelJobList::AddJob( jobRun_t function, void * data ) {
- assert( IsRegisteredJob( function ) );
- jobListThreads->AddJob( function, data );
- }
- /*
- ========================
- idParallelJobList::AddJobSPURS
- ========================
- */
- CellSpursJob128 * idParallelJobList::AddJobSPURS() {
- return NULL;
- }
- /*
- ========================
- idParallelJobList::InsertSyncPoint
- ========================
- */
- void idParallelJobList::InsertSyncPoint( jobSyncType_t syncType ) {
- jobListThreads->InsertSyncPoint( syncType );
- }
- /*
- ========================
- idParallelJobList::Wait
- ========================
- */
- void idParallelJobList::Wait() {
- if ( jobListThreads != NULL ) {
- jobListThreads->Wait();
- }
- }
- /*
- ========================
- idParallelJobList::TryWait
- ========================
- */
- bool idParallelJobList::TryWait() {
- bool done = true;
- if ( jobListThreads != NULL ) {
- done &= jobListThreads->TryWait();
- }
- return done;
- }
- /*
- ========================
- idParallelJobList::Submit
- ========================
- */
- void idParallelJobList::Submit( idParallelJobList * waitForJobList, int parallelism ) {
- assert( waitForJobList != this );
- jobListThreads->Submit( ( waitForJobList != NULL ) ? waitForJobList->jobListThreads : NULL, parallelism );
- }
- /*
- ========================
- idParallelJobList::IsSubmitted
- ========================
- */
- bool idParallelJobList::IsSubmitted() const {
- return jobListThreads->IsSubmitted();
- }
- /*
- ========================
- idParallelJobList::GetNumExecutedJobs
- ========================
- */
- unsigned int idParallelJobList::GetNumExecutedJobs() const {
- return jobListThreads->GetNumExecutedJobs();
- }
- /*
- ========================
- idParallelJobList::GetNumSyncs
- ========================
- */
- unsigned int idParallelJobList::GetNumSyncs() const {
- return jobListThreads->GetNumSyncs();
- }
- /*
- ========================
- idParallelJobList::GetSubmitTimeMicroSec
- ========================
- */
- uint64 idParallelJobList::GetSubmitTimeMicroSec() const {
- return jobListThreads->GetSubmitTimeMicroSec();
- }
- /*
- ========================
- idParallelJobList::GetStartTimeMicroSec
- ========================
- */
- uint64 idParallelJobList::GetStartTimeMicroSec() const {
- return jobListThreads->GetStartTimeMicroSec();
- }
- /*
- ========================
- idParallelJobList::GetFinishTimeMicroSec
- ========================
- */
- uint64 idParallelJobList::GetFinishTimeMicroSec() const {
- return jobListThreads->GetFinishTimeMicroSec();
- }
- /*
- ========================
- idParallelJobList::GetWaitTimeMicroSec
- ========================
- */
- uint64 idParallelJobList::GetWaitTimeMicroSec() const {
- return jobListThreads->GetWaitTimeMicroSec();
- }
- /*
- ========================
- idParallelJobList::GetTotalProcessingTimeMicroSec
- ========================
- */
- uint64 idParallelJobList::GetTotalProcessingTimeMicroSec() const {
- return jobListThreads->GetTotalProcessingTimeMicroSec();
- }
- /*
- ========================
- idParallelJobList::GetTotalWastedTimeMicroSec
- ========================
- */
- uint64 idParallelJobList::GetTotalWastedTimeMicroSec() const {
- return jobListThreads->GetTotalWastedTimeMicroSec();
- }
- /*
- ========================
- idParallelJobList::GetUnitProcessingTimeMicroSec
- ========================
- */
- uint64 idParallelJobList::GetUnitProcessingTimeMicroSec( int unit ) const {
- return jobListThreads->GetUnitProcessingTimeMicroSec( unit );
- }
- /*
- ========================
- idParallelJobList::GetUnitWastedTimeMicroSec
- ========================
- */
- uint64 idParallelJobList::GetUnitWastedTimeMicroSec( int unit ) const {
- return jobListThreads->GetUnitWastedTimeMicroSec( unit );
- }
- /*
- ========================
- idParallelJobList::GetId
- ========================
- */
- jobListId_t idParallelJobList::GetId() const {
- return jobListThreads->GetId();
- }
- /*
- ================================================================================================
- idJobThread
- ================================================================================================
- */
- const int JOB_THREAD_STACK_SIZE = 256 * 1024; // same size as the SPU local store
- struct threadJobList_t {
- idParallelJobList_Threads * jobList;
- int version;
- };
- static idCVar jobs_prioritize( "jobs_prioritize", "1", CVAR_BOOL | CVAR_NOCHEAT, "prioritize job lists" );
- class idJobThread : public idSysThread {
- public:
- idJobThread();
- ~idJobThread();
- void Start( core_t core, unsigned int threadNum );
- void AddJobList( idParallelJobList_Threads * jobList );
- private:
- threadJobList_t jobLists[MAX_JOBLISTS]; // cyclic buffer with job lists
- unsigned int firstJobList; // index of the last job list the thread grabbed
- unsigned int lastJobList; // index where the next job list to work on will be added
- idSysMutex addJobMutex;
- unsigned int threadNum;
- virtual int Run();
- };
- /*
- ========================
- idJobThread::idJobThread
- ========================
- */
- idJobThread::idJobThread() :
- firstJobList( 0 ),
- lastJobList( 0 ),
- threadNum( 0 ) {
- }
- /*
- ========================
- idJobThread::~idJobThread
- ========================
- */
- idJobThread::~idJobThread() {
- }
- /*
- ========================
- idJobThread::Start
- ========================
- */
- void idJobThread::Start( core_t core, unsigned int threadNum ) {
- this->threadNum = threadNum;
- StartWorkerThread( va( "JobListProcessor_%d", threadNum ), core, THREAD_NORMAL, JOB_THREAD_STACK_SIZE );
- }
- /*
- ========================
- idJobThread::AddJobList
- ========================
- */
- void idJobThread::AddJobList( idParallelJobList_Threads * jobList ) {
- // must lock because multiple threads may try to add new job lists at the same time
- addJobMutex.Lock();
- // wait until there is space available because in rare cases multiple versions of the same job lists may still be queued
- while( lastJobList - firstJobList >= MAX_JOBLISTS ) {
- Sys_Yield();
- }
- assert( lastJobList - firstJobList < MAX_JOBLISTS );
- jobLists[lastJobList & ( MAX_JOBLISTS - 1 )].jobList = jobList;
- jobLists[lastJobList & ( MAX_JOBLISTS - 1 )].version = jobList->GetVersion();
- lastJobList++;
- addJobMutex.Unlock();
- }
- /*
- ========================
- idJobThread::Run
- ========================
- */
- int idJobThread::Run() {
- threadJobListState_t threadJobListState[MAX_JOBLISTS];
- int numJobLists = 0;
- int lastStalledJobList = -1;
- while ( !IsTerminating() ) {
- // fetch any new job lists and add them to the local list
- if ( numJobLists < MAX_JOBLISTS && firstJobList < lastJobList ) {
- threadJobListState[numJobLists].jobList = jobLists[firstJobList & ( MAX_JOBLISTS - 1 )].jobList;
- threadJobListState[numJobLists].version = jobLists[firstJobList & ( MAX_JOBLISTS - 1 )].version;
- threadJobListState[numJobLists].signalIndex = 0;
- threadJobListState[numJobLists].lastJobIndex = 0;
- threadJobListState[numJobLists].nextJobIndex = -1;
- numJobLists++;
- firstJobList++;
- }
- if ( numJobLists == 0 ) {
- break;
- }
- int currentJobList = 0;
- jobListPriority_t priority = JOBLIST_PRIORITY_NONE;
- if ( lastStalledJobList < 0 ) {
- // find the job list with the highest priority
- for ( int i = 0; i < numJobLists; i++ ) {
- if ( threadJobListState[i].jobList->GetPriority() > priority && !threadJobListState[i].jobList->WaitForOtherJobList() ) {
- priority = threadJobListState[i].jobList->GetPriority();
- currentJobList = i;
- }
- }
- } else {
- // try to hide the stall with a job from a list that has equal or higher priority
- currentJobList = lastStalledJobList;
- priority = threadJobListState[lastStalledJobList].jobList->GetPriority();
- for ( int i = 0; i < numJobLists; i++ ) {
- if ( i != lastStalledJobList && threadJobListState[i].jobList->GetPriority() >= priority && !threadJobListState[i].jobList->WaitForOtherJobList() ) {
- priority = threadJobListState[i].jobList->GetPriority();
- currentJobList = i;
- }
- }
- }
- // if the priority is high then try to run through the whole list to reduce the overhead
- // otherwise run a single job and re-evaluate priorities for the next job
- bool singleJob = ( priority == JOBLIST_PRIORITY_HIGH ) ? false : jobs_prioritize.GetBool();
- // try running one or more jobs from the current job list
- int result = threadJobListState[currentJobList].jobList->RunJobs( threadNum, threadJobListState[currentJobList], singleJob );
- if ( ( result & idParallelJobList_Threads::RUN_DONE ) != 0 ) {
- // done with this job list so remove it from the local list
- for ( int i = currentJobList; i < numJobLists - 1; i++ ) {
- threadJobListState[i] = threadJobListState[i + 1];
- }
- numJobLists--;
- lastStalledJobList = -1;
- } else if ( ( result & idParallelJobList_Threads::RUN_STALLED ) != 0 ) {
- // yield when stalled on the same job list again without making any progress
- if ( currentJobList == lastStalledJobList ) {
- if ( ( result & idParallelJobList_Threads::RUN_PROGRESS ) == 0 ) {
- Sys_Yield();
- }
- }
- lastStalledJobList = currentJobList;
- } else {
- lastStalledJobList = -1;
- }
- }
- return 0;
- }
- /*
- ================================================================================================
- idParallelJobManagerLocal
- ================================================================================================
- */
- extern void Sys_CPUCount( int & logicalNum, int & coreNum, int & packageNum );
- // WINDOWS LOGICAL PROCESSOR LIMITS:
- //
- // http://download.microsoft.com/download/5/7/7/577a5684-8a83-43ae-9272-ff260a9c20e2/Hyper-thread_Windows.doc
- //
- // Physical Logical (Cores + HT)
- // Windows XP Home Edition 1 2
- // Windows XP Professional 2 4
- // Windows Server 2003, Standard Edition 4 8
- // Windows Server 2003, Enterprise Edition 8 16
- // Windows Server 2003, Datacenter Edition 32 32
- //
- // Windows Vista ? ?
- //
- // Windows 7 Starter 1 32/64
- // Windows 7 Home Basic 1 32/64
- // Windows 7 Professional 2 32/64
- //
- //
- // Hyperthreading is not dead yet. Intel's Core i7 Processor is quad-core with HT for 8 logicals.
- // DOOM3: We don't have that many jobs, so just set this fairly low so we don't spin up a ton of idle threads
- #define MAX_JOB_THREADS 2
- #define NUM_JOB_THREADS "2"
- #define JOB_THREAD_CORES { CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
- CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
- CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
- CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
- CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
- CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
- CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
- CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY }
- idCVar jobs_numThreads( "jobs_numThreads", NUM_JOB_THREADS, CVAR_INTEGER | CVAR_NOCHEAT, "number of threads used to crunch through jobs", 0, MAX_JOB_THREADS );
- class idParallelJobManagerLocal : public idParallelJobManager {
- public:
- virtual ~idParallelJobManagerLocal() {}
- virtual void Init();
- virtual void Shutdown();
- virtual idParallelJobList * AllocJobList( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs, const idColor * color );
- virtual void FreeJobList( idParallelJobList * jobList );
- virtual int GetNumJobLists() const;
- virtual int GetNumFreeJobLists() const;
- virtual idParallelJobList * GetJobList( int index );
- virtual int GetNumProcessingUnits();
- virtual void WaitForAllJobLists();
- void Submit( idParallelJobList_Threads * jobList, int parallelism );
- private:
- idJobThread threads[MAX_JOB_THREADS];
- unsigned int maxThreads;
- int numPhysicalCpuCores;
- int numLogicalCpuCores;
- int numCpuPackages;
- idStaticList< idParallelJobList *, MAX_JOBLISTS > jobLists;
- };
- idParallelJobManagerLocal parallelJobManagerLocal;
- idParallelJobManager * parallelJobManager = ¶llelJobManagerLocal;
- /*
- ========================
- SubmitJobList
- ========================
- */
- void SubmitJobList( idParallelJobList_Threads * jobList, int parallelism ) {
- parallelJobManagerLocal.Submit( jobList, parallelism );
- }
- /*
- ========================
- idParallelJobManagerLocal::Init
- ========================
- */
- void idParallelJobManagerLocal::Init() {
- // on consoles this will have specific cores for the threads, but on PC they will all be CORE_ANY
- core_t cores[] = JOB_THREAD_CORES;
- assert( sizeof( cores ) / sizeof( cores[0] ) >= MAX_JOB_THREADS );
- for ( int i = 0; i < MAX_JOB_THREADS; i++ ) {
- threads[i].Start( cores[i], i );
- }
- maxThreads = jobs_numThreads.GetInteger();
- Sys_CPUCount( numPhysicalCpuCores, numLogicalCpuCores, numCpuPackages );
- }
- /*
- ========================
- idParallelJobManagerLocal::Shutdown
- ========================
- */
- void idParallelJobManagerLocal::Shutdown() {
- for ( int i = 0; i < MAX_JOB_THREADS; i++ ) {
- threads[i].StopThread();
- }
- }
- /*
- ========================
- idParallelJobManagerLocal::AllocJobList
- ========================
- */
- idParallelJobList * idParallelJobManagerLocal::AllocJobList( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs, const idColor * color ) {
- for ( int i = 0; i < jobLists.Num(); i++ ) {
- if ( jobLists[i]->GetId() == id ) {
- // idStudio may cause job lists to be allocated multiple times
- }
- }
- idParallelJobList * jobList = new (TAG_JOBLIST) idParallelJobList( id, priority, maxJobs, maxSyncs, color );
- jobLists.Append( jobList );
- return jobList;
- }
- /*
- ========================
- idParallelJobManagerLocal::FreeJobList
- ========================
- */
- void idParallelJobManagerLocal::FreeJobList( idParallelJobList * jobList ) {
- if ( jobList == NULL ) {
- return;
- }
- // wait for all job threads to finish because job list deletion is not thread safe
- for ( unsigned int i = 0; i < maxThreads; i++ ) {
- threads[i].WaitForThread();
- }
- int index = jobLists.FindIndex( jobList );
- assert( index >= 0 && jobLists[index] == jobList );
- jobLists[index]->Wait();
- delete jobLists[index];
- jobLists.RemoveIndexFast( index );
- }
- /*
- ========================
- idParallelJobManagerLocal::GetNumJobLists
- ========================
- */
- int idParallelJobManagerLocal::GetNumJobLists() const {
- return jobLists.Num();
- }
- /*
- ========================
- idParallelJobManagerLocal::GetNumFreeJobLists
- ========================
- */
- int idParallelJobManagerLocal::GetNumFreeJobLists() const {
- return MAX_JOBLISTS - jobLists.Num();
- }
- /*
- ========================
- idParallelJobManagerLocal::GetJobList
- ========================
- */
- idParallelJobList * idParallelJobManagerLocal::GetJobList( int index ) {
- return jobLists[index];
- }
- /*
- ========================
- idParallelJobManagerLocal::GetNumProcessingUnits
- ========================
- */
- int idParallelJobManagerLocal::GetNumProcessingUnits() {
- return maxThreads;
- }
- /*
- ========================
- idParallelJobManagerLocal::WaitForAllJobLists
- ========================
- */
- void idParallelJobManagerLocal::WaitForAllJobLists() {
- // wait for all job lists to complete
- for ( int i = 0; i < jobLists.Num(); i++ ) {
- jobLists[i]->Wait();
- }
- }
- /*
- ========================
- idParallelJobManagerLocal::Submit
- ========================
- */
- void idParallelJobManagerLocal::Submit( idParallelJobList_Threads * jobList, int parallelism ) {
- if ( jobs_numThreads.IsModified() ) {
- maxThreads = idMath::ClampInt( 0, MAX_JOB_THREADS, jobs_numThreads.GetInteger() );
- jobs_numThreads.ClearModified();
- }
- // determine the number of threads to use
- int numThreads = maxThreads;
- if ( parallelism == JOBLIST_PARALLELISM_DEFAULT ) {
- numThreads = maxThreads;
- } else if ( parallelism == JOBLIST_PARALLELISM_MAX_CORES ) {
- numThreads = numLogicalCpuCores;
- } else if ( parallelism == JOBLIST_PARALLELISM_MAX_THREADS ) {
- numThreads = MAX_JOB_THREADS;
- } else if ( parallelism > MAX_JOB_THREADS ) {
- numThreads = MAX_JOB_THREADS;
- } else {
- numThreads = parallelism;
- }
- if ( numThreads <= 0 ) {
- threadJobListState_t state( jobList->GetVersion() );
- jobList->RunJobs( 0, state, false );
- return;
- }
- for ( int i = 0; i < numThreads; i++ ) {
- threads[i].AddJobList( jobList );
- threads[i].SignalWork();
- }
- }
|