JobQueueDB.php 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852
  1. <?php
  2. /**
  3. * Database-backed job queue code.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License along
  16. * with this program; if not, write to the Free Software Foundation, Inc.,
  17. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. * http://www.gnu.org/copyleft/gpl.html
  19. *
  20. * @file
  21. */
  22. use Wikimedia\Rdbms\IDatabase;
  23. use Wikimedia\Rdbms\DBConnRef;
  24. use Wikimedia\Rdbms\DBConnectionError;
  25. use Wikimedia\Rdbms\DBError;
  26. use MediaWiki\MediaWikiServices;
  27. use Wikimedia\ScopedCallback;
  28. /**
  29. * Class to handle job queues stored in the DB
  30. *
  31. * @ingroup JobQueue
  32. * @since 1.21
  33. */
  34. class JobQueueDB extends JobQueue {
  35. const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
  36. const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
  37. const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
  38. const MAX_OFFSET = 255; // integer; maximum number of rows to skip
  39. /** @var WANObjectCache */
  40. protected $cache;
  41. /** @var bool|string Name of an external DB cluster. False if not set */
  42. protected $cluster = false;
  43. /**
  44. * Additional parameters include:
  45. * - cluster : The name of an external cluster registered via LBFactory.
  46. * If not specified, the primary DB cluster for the wiki will be used.
  47. * This can be overridden with a custom cluster so that DB handles will
  48. * be retrieved via LBFactory::getExternalLB() and getConnection().
  49. * @param array $params
  50. */
  51. protected function __construct( array $params ) {
  52. parent::__construct( $params );
  53. $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
  54. $this->cache = ObjectCache::getMainWANInstance();
  55. }
  56. protected function supportedOrders() {
  57. return [ 'random', 'timestamp', 'fifo' ];
  58. }
  59. protected function optimalOrder() {
  60. return 'random';
  61. }
  62. /**
  63. * @see JobQueue::doIsEmpty()
  64. * @return bool
  65. */
  66. protected function doIsEmpty() {
  67. $dbr = $this->getReplicaDB();
  68. try {
  69. $found = $dbr->selectField( // unclaimed job
  70. 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
  71. );
  72. } catch ( DBError $e ) {
  73. $this->throwDBException( $e );
  74. }
  75. return !$found;
  76. }
  77. /**
  78. * @see JobQueue::doGetSize()
  79. * @return int
  80. */
  81. protected function doGetSize() {
  82. $key = $this->getCacheKey( 'size' );
  83. $size = $this->cache->get( $key );
  84. if ( is_int( $size ) ) {
  85. return $size;
  86. }
  87. try {
  88. $dbr = $this->getReplicaDB();
  89. $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
  90. [ 'job_cmd' => $this->type, 'job_token' => '' ],
  91. __METHOD__
  92. );
  93. } catch ( DBError $e ) {
  94. $this->throwDBException( $e );
  95. }
  96. $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
  97. return $size;
  98. }
  99. /**
  100. * @see JobQueue::doGetAcquiredCount()
  101. * @return int
  102. */
  103. protected function doGetAcquiredCount() {
  104. if ( $this->claimTTL <= 0 ) {
  105. return 0; // no acknowledgements
  106. }
  107. $key = $this->getCacheKey( 'acquiredcount' );
  108. $count = $this->cache->get( $key );
  109. if ( is_int( $count ) ) {
  110. return $count;
  111. }
  112. $dbr = $this->getReplicaDB();
  113. try {
  114. $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
  115. [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
  116. __METHOD__
  117. );
  118. } catch ( DBError $e ) {
  119. $this->throwDBException( $e );
  120. }
  121. $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
  122. return $count;
  123. }
  124. /**
  125. * @see JobQueue::doGetAbandonedCount()
  126. * @return int
  127. * @throws MWException
  128. */
  129. protected function doGetAbandonedCount() {
  130. if ( $this->claimTTL <= 0 ) {
  131. return 0; // no acknowledgements
  132. }
  133. $key = $this->getCacheKey( 'abandonedcount' );
  134. $count = $this->cache->get( $key );
  135. if ( is_int( $count ) ) {
  136. return $count;
  137. }
  138. $dbr = $this->getReplicaDB();
  139. try {
  140. $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
  141. [
  142. 'job_cmd' => $this->type,
  143. "job_token != {$dbr->addQuotes( '' )}",
  144. "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
  145. ],
  146. __METHOD__
  147. );
  148. } catch ( DBError $e ) {
  149. $this->throwDBException( $e );
  150. }
  151. $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
  152. return $count;
  153. }
  154. /**
  155. * @see JobQueue::doBatchPush()
  156. * @param IJobSpecification[] $jobs
  157. * @param int $flags
  158. * @throws DBError|Exception
  159. * @return void
  160. */
  161. protected function doBatchPush( array $jobs, $flags ) {
  162. $dbw = $this->getMasterDB();
  163. // In general, there will be two cases here:
  164. // a) sqlite; DB connection is probably a regular round-aware handle.
  165. // If the connection is busy with a transaction, then defer the job writes
  166. // until right before the main round commit step. Any errors that bubble
  167. // up will rollback the main commit round.
  168. // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTO handle.
  169. // No transaction is active nor will be started by writes, so enqueue the jobs
  170. // now so that any errors will show up immediately as the interface expects. Any
  171. // errors that bubble up will rollback the main commit round.
  172. $fname = __METHOD__;
  173. $dbw->onTransactionPreCommitOrIdle(
  174. function () use ( $dbw, $jobs, $flags, $fname ) {
  175. $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
  176. },
  177. $fname
  178. );
  179. }
  180. /**
  181. * This function should *not* be called outside of JobQueueDB
  182. *
  183. * @param IDatabase $dbw
  184. * @param IJobSpecification[] $jobs
  185. * @param int $flags
  186. * @param string $method
  187. * @throws DBError
  188. * @return void
  189. */
  190. public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
  191. if ( !count( $jobs ) ) {
  192. return;
  193. }
  194. $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
  195. $rowList = []; // list of jobs for jobs that are not de-duplicated
  196. foreach ( $jobs as $job ) {
  197. $row = $this->insertFields( $job );
  198. if ( $job->ignoreDuplicates() ) {
  199. $rowSet[$row['job_sha1']] = $row;
  200. } else {
  201. $rowList[] = $row;
  202. }
  203. }
  204. if ( $flags & self::QOS_ATOMIC ) {
  205. $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
  206. }
  207. try {
  208. // Strip out any duplicate jobs that are already in the queue...
  209. if ( count( $rowSet ) ) {
  210. $res = $dbw->select( 'job', 'job_sha1',
  211. [
  212. // No job_type condition since it's part of the job_sha1 hash
  213. 'job_sha1' => array_keys( $rowSet ),
  214. 'job_token' => '' // unclaimed
  215. ],
  216. $method
  217. );
  218. foreach ( $res as $row ) {
  219. wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
  220. unset( $rowSet[$row->job_sha1] ); // already enqueued
  221. }
  222. }
  223. // Build the full list of job rows to insert
  224. $rows = array_merge( $rowList, array_values( $rowSet ) );
  225. // Insert the job rows in chunks to avoid replica DB lag...
  226. foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
  227. $dbw->insert( 'job', $rowBatch, $method );
  228. }
  229. JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
  230. JobQueue::incrStats( 'dupe_inserts', $this->type,
  231. count( $rowSet ) + count( $rowList ) - count( $rows )
  232. );
  233. } catch ( DBError $e ) {
  234. $this->throwDBException( $e );
  235. }
  236. if ( $flags & self::QOS_ATOMIC ) {
  237. $dbw->endAtomic( $method );
  238. }
  239. return;
  240. }
  241. /**
  242. * @see JobQueue::doPop()
  243. * @return Job|bool
  244. */
  245. protected function doPop() {
  246. $dbw = $this->getMasterDB();
  247. try {
  248. $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
  249. $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
  250. $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
  251. $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
  252. } );
  253. $uuid = wfRandomString( 32 ); // pop attempt
  254. $job = false; // job popped off
  255. do { // retry when our row is invalid or deleted as a duplicate
  256. // Try to reserve a row in the DB...
  257. if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
  258. $row = $this->claimOldest( $uuid );
  259. } else { // random first
  260. $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
  261. $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
  262. $row = $this->claimRandom( $uuid, $rand, $gte );
  263. }
  264. // Check if we found a row to reserve...
  265. if ( !$row ) {
  266. break; // nothing to do
  267. }
  268. JobQueue::incrStats( 'pops', $this->type );
  269. // Get the job object from the row...
  270. $title = Title::makeTitle( $row->job_namespace, $row->job_title );
  271. $job = Job::factory( $row->job_cmd, $title,
  272. self::extractBlob( $row->job_params ), $row->job_id );
  273. $job->metadata['id'] = $row->job_id;
  274. $job->metadata['timestamp'] = $row->job_timestamp;
  275. break; // done
  276. } while ( true );
  277. if ( !$job || mt_rand( 0, 9 ) == 0 ) {
  278. // Handled jobs that need to be recycled/deleted;
  279. // any recycled jobs will be picked up next attempt
  280. $this->recycleAndDeleteStaleJobs();
  281. }
  282. } catch ( DBError $e ) {
  283. $this->throwDBException( $e );
  284. }
  285. return $job;
  286. }
  287. /**
  288. * Reserve a row with a single UPDATE without holding row locks over RTTs...
  289. *
  290. * @param string $uuid 32 char hex string
  291. * @param int $rand Random unsigned integer (31 bits)
  292. * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
  293. * @return stdClass|bool Row|false
  294. */
  295. protected function claimRandom( $uuid, $rand, $gte ) {
  296. $dbw = $this->getMasterDB();
  297. // Check cache to see if the queue has <= OFFSET items
  298. $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
  299. $row = false; // the row acquired
  300. $invertedDirection = false; // whether one job_random direction was already scanned
  301. // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
  302. // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
  303. // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
  304. // be used here with MySQL.
  305. do {
  306. if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
  307. // For small queues, using OFFSET will overshoot and return no rows more often.
  308. // Instead, this uses job_random to pick a row (possibly checking both directions).
  309. $ineq = $gte ? '>=' : '<=';
  310. $dir = $gte ? 'ASC' : 'DESC';
  311. $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
  312. [
  313. 'job_cmd' => $this->type,
  314. 'job_token' => '', // unclaimed
  315. "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
  316. __METHOD__,
  317. [ 'ORDER BY' => "job_random {$dir}" ]
  318. );
  319. if ( !$row && !$invertedDirection ) {
  320. $gte = !$gte;
  321. $invertedDirection = true;
  322. continue; // try the other direction
  323. }
  324. } else { // table *may* have >= MAX_OFFSET rows
  325. // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
  326. // in MySQL if there are many rows for some reason. This uses a small OFFSET
  327. // instead of job_random for reducing excess claim retries.
  328. $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
  329. [
  330. 'job_cmd' => $this->type,
  331. 'job_token' => '', // unclaimed
  332. ],
  333. __METHOD__,
  334. [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
  335. );
  336. if ( !$row ) {
  337. $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
  338. $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
  339. continue; // use job_random
  340. }
  341. }
  342. if ( $row ) { // claim the job
  343. $dbw->update( 'job', // update by PK
  344. [
  345. 'job_token' => $uuid,
  346. 'job_token_timestamp' => $dbw->timestamp(),
  347. 'job_attempts = job_attempts+1' ],
  348. [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
  349. __METHOD__
  350. );
  351. // This might get raced out by another runner when claiming the previously
  352. // selected row. The use of job_random should minimize this problem, however.
  353. if ( !$dbw->affectedRows() ) {
  354. $row = false; // raced out
  355. }
  356. } else {
  357. break; // nothing to do
  358. }
  359. } while ( !$row );
  360. return $row;
  361. }
  362. /**
  363. * Reserve a row with a single UPDATE without holding row locks over RTTs...
  364. *
  365. * @param string $uuid 32 char hex string
  366. * @return stdClass|bool Row|false
  367. */
  368. protected function claimOldest( $uuid ) {
  369. $dbw = $this->getMasterDB();
  370. $row = false; // the row acquired
  371. do {
  372. if ( $dbw->getType() === 'mysql' ) {
  373. // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
  374. // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
  375. // Oracle and Postgre have no such limitation. However, MySQL offers an
  376. // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
  377. $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
  378. "SET " .
  379. "job_token = {$dbw->addQuotes( $uuid ) }, " .
  380. "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
  381. "job_attempts = job_attempts+1 " .
  382. "WHERE ( " .
  383. "job_cmd = {$dbw->addQuotes( $this->type )} " .
  384. "AND job_token = {$dbw->addQuotes( '' )} " .
  385. ") ORDER BY job_id ASC LIMIT 1",
  386. __METHOD__
  387. );
  388. } else {
  389. // Use a subquery to find the job, within an UPDATE to claim it.
  390. // This uses as much of the DB wrapper functions as possible.
  391. $dbw->update( 'job',
  392. [
  393. 'job_token' => $uuid,
  394. 'job_token_timestamp' => $dbw->timestamp(),
  395. 'job_attempts = job_attempts+1' ],
  396. [ 'job_id = (' .
  397. $dbw->selectSQLText( 'job', 'job_id',
  398. [ 'job_cmd' => $this->type, 'job_token' => '' ],
  399. __METHOD__,
  400. [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
  401. ')'
  402. ],
  403. __METHOD__
  404. );
  405. }
  406. // Fetch any row that we just reserved...
  407. if ( $dbw->affectedRows() ) {
  408. $row = $dbw->selectRow( 'job', self::selectFields(),
  409. [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
  410. );
  411. if ( !$row ) { // raced out by duplicate job removal
  412. wfDebug( "Row deleted as duplicate by another process.\n" );
  413. }
  414. } else {
  415. break; // nothing to do
  416. }
  417. } while ( !$row );
  418. return $row;
  419. }
  420. /**
  421. * @see JobQueue::doAck()
  422. * @param Job $job
  423. * @throws MWException
  424. */
  425. protected function doAck( Job $job ) {
  426. if ( !isset( $job->metadata['id'] ) ) {
  427. throw new MWException( "Job of type '{$job->getType()}' has no ID." );
  428. }
  429. $dbw = $this->getMasterDB();
  430. try {
  431. $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
  432. $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
  433. $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
  434. $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
  435. } );
  436. // Delete a row with a single DELETE without holding row locks over RTTs...
  437. $dbw->delete( 'job',
  438. [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
  439. JobQueue::incrStats( 'acks', $this->type );
  440. } catch ( DBError $e ) {
  441. $this->throwDBException( $e );
  442. }
  443. }
  444. /**
  445. * @see JobQueue::doDeduplicateRootJob()
  446. * @param IJobSpecification $job
  447. * @throws MWException
  448. * @return bool
  449. */
  450. protected function doDeduplicateRootJob( IJobSpecification $job ) {
  451. $params = $job->getParams();
  452. if ( !isset( $params['rootJobSignature'] ) ) {
  453. throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
  454. } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
  455. throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
  456. }
  457. $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
  458. // Callers should call batchInsert() and then this function so that if the insert
  459. // fails, the de-duplication registration will be aborted. Since the insert is
  460. // deferred till "transaction idle", do the same here, so that the ordering is
  461. // maintained. Having only the de-duplication registration succeed would cause
  462. // jobs to become no-ops without any actual jobs that made them redundant.
  463. $dbw = $this->getMasterDB();
  464. $cache = $this->dupCache;
  465. $dbw->onTransactionIdle(
  466. function () use ( $cache, $params, $key, $dbw ) {
  467. $timestamp = $cache->get( $key ); // current last timestamp of this job
  468. if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
  469. return true; // a newer version of this root job was enqueued
  470. }
  471. // Update the timestamp of the last root job started at the location...
  472. return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
  473. },
  474. __METHOD__
  475. );
  476. return true;
  477. }
  478. /**
  479. * @see JobQueue::doDelete()
  480. * @return bool
  481. */
  482. protected function doDelete() {
  483. $dbw = $this->getMasterDB();
  484. try {
  485. $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
  486. } catch ( DBError $e ) {
  487. $this->throwDBException( $e );
  488. }
  489. return true;
  490. }
  491. /**
  492. * @see JobQueue::doWaitForBackups()
  493. * @return void
  494. */
  495. protected function doWaitForBackups() {
  496. $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
  497. $lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
  498. }
  499. /**
  500. * @return void
  501. */
  502. protected function doFlushCaches() {
  503. foreach ( [ 'size', 'acquiredcount' ] as $type ) {
  504. $this->cache->delete( $this->getCacheKey( $type ) );
  505. }
  506. }
  507. /**
  508. * @see JobQueue::getAllQueuedJobs()
  509. * @return Iterator
  510. */
  511. public function getAllQueuedJobs() {
  512. return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
  513. }
  514. /**
  515. * @see JobQueue::getAllAcquiredJobs()
  516. * @return Iterator
  517. */
  518. public function getAllAcquiredJobs() {
  519. return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
  520. }
  521. /**
  522. * @param array $conds Query conditions
  523. * @return Iterator
  524. */
  525. protected function getJobIterator( array $conds ) {
  526. $dbr = $this->getReplicaDB();
  527. try {
  528. return new MappedIterator(
  529. $dbr->select( 'job', self::selectFields(), $conds ),
  530. function ( $row ) {
  531. $job = Job::factory(
  532. $row->job_cmd,
  533. Title::makeTitle( $row->job_namespace, $row->job_title ),
  534. strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
  535. );
  536. $job->metadata['id'] = $row->job_id;
  537. $job->metadata['timestamp'] = $row->job_timestamp;
  538. return $job;
  539. }
  540. );
  541. } catch ( DBError $e ) {
  542. $this->throwDBException( $e );
  543. }
  544. }
  545. public function getCoalesceLocationInternal() {
  546. return $this->cluster
  547. ? "DBCluster:{$this->cluster}:{$this->wiki}"
  548. : "LBFactory:{$this->wiki}";
  549. }
  550. protected function doGetSiblingQueuesWithJobs( array $types ) {
  551. $dbr = $this->getReplicaDB();
  552. // @note: this does not check whether the jobs are claimed or not.
  553. // This is useful so JobQueueGroup::pop() also sees queues that only
  554. // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
  555. // failed jobs so that they can be popped again for that edge case.
  556. $res = $dbr->select( 'job', 'DISTINCT job_cmd',
  557. [ 'job_cmd' => $types ], __METHOD__ );
  558. $types = [];
  559. foreach ( $res as $row ) {
  560. $types[] = $row->job_cmd;
  561. }
  562. return $types;
  563. }
  564. protected function doGetSiblingQueueSizes( array $types ) {
  565. $dbr = $this->getReplicaDB();
  566. $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
  567. [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
  568. $sizes = [];
  569. foreach ( $res as $row ) {
  570. $sizes[$row->job_cmd] = (int)$row->count;
  571. }
  572. return $sizes;
  573. }
  574. /**
  575. * Recycle or destroy any jobs that have been claimed for too long
  576. *
  577. * @return int Number of jobs recycled/deleted
  578. */
  579. public function recycleAndDeleteStaleJobs() {
  580. $now = time();
  581. $count = 0; // affected rows
  582. $dbw = $this->getMasterDB();
  583. try {
  584. if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
  585. return $count; // already in progress
  586. }
  587. // Remove claims on jobs acquired for too long if enabled...
  588. if ( $this->claimTTL > 0 ) {
  589. $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
  590. // Get the IDs of jobs that have be claimed but not finished after too long.
  591. // These jobs can be recycled into the queue by expiring the claim. Selecting
  592. // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
  593. $res = $dbw->select( 'job', 'job_id',
  594. [
  595. 'job_cmd' => $this->type,
  596. "job_token != {$dbw->addQuotes( '' )}", // was acquired
  597. "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
  598. "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
  599. __METHOD__
  600. );
  601. $ids = array_map(
  602. function ( $o ) {
  603. return $o->job_id;
  604. }, iterator_to_array( $res )
  605. );
  606. if ( count( $ids ) ) {
  607. // Reset job_token for these jobs so that other runners will pick them up.
  608. // Set the timestamp to the current time, as it is useful to now that the job
  609. // was already tried before (the timestamp becomes the "released" time).
  610. $dbw->update( 'job',
  611. [
  612. 'job_token' => '',
  613. 'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
  614. [
  615. 'job_id' => $ids ],
  616. __METHOD__
  617. );
  618. $affected = $dbw->affectedRows();
  619. $count += $affected;
  620. JobQueue::incrStats( 'recycles', $this->type, $affected );
  621. $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
  622. }
  623. }
  624. // Just destroy any stale jobs...
  625. $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
  626. $conds = [
  627. 'job_cmd' => $this->type,
  628. "job_token != {$dbw->addQuotes( '' )}", // was acquired
  629. "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
  630. ];
  631. if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
  632. $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
  633. }
  634. // Get the IDs of jobs that are considered stale and should be removed. Selecting
  635. // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
  636. $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
  637. $ids = array_map(
  638. function ( $o ) {
  639. return $o->job_id;
  640. }, iterator_to_array( $res )
  641. );
  642. if ( count( $ids ) ) {
  643. $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
  644. $affected = $dbw->affectedRows();
  645. $count += $affected;
  646. JobQueue::incrStats( 'abandons', $this->type, $affected );
  647. }
  648. $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
  649. } catch ( DBError $e ) {
  650. $this->throwDBException( $e );
  651. }
  652. return $count;
  653. }
  654. /**
  655. * @param IJobSpecification $job
  656. * @return array
  657. */
  658. protected function insertFields( IJobSpecification $job ) {
  659. $dbw = $this->getMasterDB();
  660. return [
  661. // Fields that describe the nature of the job
  662. 'job_cmd' => $job->getType(),
  663. 'job_namespace' => $job->getTitle()->getNamespace(),
  664. 'job_title' => $job->getTitle()->getDBkey(),
  665. 'job_params' => self::makeBlob( $job->getParams() ),
  666. // Additional job metadata
  667. 'job_timestamp' => $dbw->timestamp(),
  668. 'job_sha1' => Wikimedia\base_convert(
  669. sha1( serialize( $job->getDeduplicationInfo() ) ),
  670. 16, 36, 31
  671. ),
  672. 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
  673. ];
  674. }
  675. /**
  676. * @throws JobQueueConnectionError
  677. * @return DBConnRef
  678. */
  679. protected function getReplicaDB() {
  680. try {
  681. return $this->getDB( DB_REPLICA );
  682. } catch ( DBConnectionError $e ) {
  683. throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
  684. }
  685. }
  686. /**
  687. * @throws JobQueueConnectionError
  688. * @return DBConnRef
  689. */
  690. protected function getMasterDB() {
  691. try {
  692. return $this->getDB( DB_MASTER );
  693. } catch ( DBConnectionError $e ) {
  694. throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
  695. }
  696. }
  697. /**
  698. * @param int $index (DB_REPLICA/DB_MASTER)
  699. * @return DBConnRef
  700. */
  701. protected function getDB( $index ) {
  702. $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
  703. $lb = ( $this->cluster !== false )
  704. ? $lbFactory->getExternalLB( $this->cluster )
  705. : $lbFactory->getMainLB( $this->wiki );
  706. return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
  707. // Keep a separate connection to avoid contention and deadlocks;
  708. // However, SQLite has the opposite behavior due to DB-level locking.
  709. ? $lb->getConnectionRef( $index, [], $this->wiki, $lb::CONN_TRX_AUTO )
  710. // Jobs insertion will be defered until the PRESEND stage to reduce contention.
  711. : $lb->getConnectionRef( $index, [], $this->wiki );
  712. }
  713. /**
  714. * @param string $property
  715. * @return string
  716. */
  717. private function getCacheKey( $property ) {
  718. list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
  719. $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
  720. return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
  721. }
  722. /**
  723. * @param array|bool $params
  724. * @return string
  725. */
  726. protected static function makeBlob( $params ) {
  727. if ( $params !== false ) {
  728. return serialize( $params );
  729. } else {
  730. return '';
  731. }
  732. }
  733. /**
  734. * @param string $blob
  735. * @return bool|mixed
  736. */
  737. protected static function extractBlob( $blob ) {
  738. if ( (string)$blob !== '' ) {
  739. return unserialize( $blob );
  740. } else {
  741. return false;
  742. }
  743. }
  744. /**
  745. * @param DBError $e
  746. * @throws JobQueueError
  747. */
  748. protected function throwDBException( DBError $e ) {
  749. throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
  750. }
  751. /**
  752. * Return the list of job fields that should be selected.
  753. * @since 1.23
  754. * @return array
  755. */
  756. public static function selectFields() {
  757. return [
  758. 'job_id',
  759. 'job_cmd',
  760. 'job_namespace',
  761. 'job_title',
  762. 'job_timestamp',
  763. 'job_params',
  764. 'job_random',
  765. 'job_attempts',
  766. 'job_token',
  767. 'job_token_timestamp',
  768. 'job_sha1',
  769. ];
  770. }
  771. }