JobQueue.php 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. <?php
  2. /**
  3. * Job queue base code.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License along
  16. * with this program; if not, write to the Free Software Foundation, Inc.,
  17. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. * http://www.gnu.org/copyleft/gpl.html
  19. *
  20. * @file
  21. * @defgroup JobQueue JobQueue
  22. */
  23. use MediaWiki\MediaWikiServices;
  24. /**
  25. * Class to handle enqueueing and running of background jobs
  26. *
  27. * @ingroup JobQueue
  28. * @since 1.21
  29. */
  30. abstract class JobQueue {
  31. /** @var string Wiki ID */
  32. protected $wiki;
  33. /** @var string Job type */
  34. protected $type;
  35. /** @var string Job priority for pop() */
  36. protected $order;
  37. /** @var int Time to live in seconds */
  38. protected $claimTTL;
  39. /** @var int Maximum number of times to try a job */
  40. protected $maxTries;
  41. /** @var string|bool Read only rationale (or false if r/w) */
  42. protected $readOnlyReason;
  43. /** @var BagOStuff */
  44. protected $dupCache;
  45. /** @var JobQueueAggregator */
  46. protected $aggr;
  47. const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
  48. const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
  49. /**
  50. * @param array $params
  51. * @throws MWException
  52. */
  53. protected function __construct( array $params ) {
  54. $this->wiki = $params['wiki'];
  55. $this->type = $params['type'];
  56. $this->claimTTL = $params['claimTTL'] ?? 0;
  57. $this->maxTries = $params['maxTries'] ?? 3;
  58. if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
  59. $this->order = $params['order'];
  60. } else {
  61. $this->order = $this->optimalOrder();
  62. }
  63. if ( !in_array( $this->order, $this->supportedOrders() ) ) {
  64. throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
  65. }
  66. $this->dupCache = wfGetCache( CACHE_ANYTHING );
  67. $this->aggr = $params['aggregator'] ?? new JobQueueAggregatorNull( [] );
  68. $this->readOnlyReason = $params['readOnlyReason'] ?? false;
  69. }
  70. /**
  71. * Get a job queue object of the specified type.
  72. * $params includes:
  73. * - class : What job class to use (determines job type)
  74. * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
  75. * - type : The name of the job types this queue handles
  76. * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
  77. * If "fifo" is used, the queue will effectively be FIFO. Note that job
  78. * completion will not appear to be exactly FIFO if there are multiple
  79. * job runners since jobs can take different times to finish once popped.
  80. * If "timestamp" is used, the queue will at least be loosely ordered
  81. * by timestamp, allowing for some jobs to be popped off out of order.
  82. * If "random" is used, pop() will pick jobs in random order.
  83. * Note that it may only be weakly random (e.g. a lottery of the oldest X).
  84. * If "any" is choosen, the queue will use whatever order is the fastest.
  85. * This might be useful for improving concurrency for job acquisition.
  86. * - claimTTL : If supported, the queue will recycle jobs that have been popped
  87. * but not acknowledged as completed after this many seconds. Recycling
  88. * of jobs simply means re-inserting them into the queue. Jobs can be
  89. * attempted up to three times before being discarded.
  90. * - readOnlyReason : Set this to a string to make the queue read-only.
  91. *
  92. * Queue classes should throw an exception if they do not support the options given.
  93. *
  94. * @param array $params
  95. * @return JobQueue
  96. * @throws MWException
  97. */
  98. final public static function factory( array $params ) {
  99. $class = $params['class'];
  100. if ( !class_exists( $class ) ) {
  101. throw new MWException( "Invalid job queue class '$class'." );
  102. }
  103. $obj = new $class( $params );
  104. if ( !( $obj instanceof self ) ) {
  105. throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
  106. }
  107. return $obj;
  108. }
  109. /**
  110. * @return string Wiki ID
  111. */
  112. final public function getWiki() {
  113. return $this->wiki;
  114. }
  115. /**
  116. * @return string Job type that this queue handles
  117. */
  118. final public function getType() {
  119. return $this->type;
  120. }
  121. /**
  122. * @return string One of (random, timestamp, fifo, undefined)
  123. */
  124. final public function getOrder() {
  125. return $this->order;
  126. }
  127. /**
  128. * Get the allowed queue orders for configuration validation
  129. *
  130. * @return array Subset of (random, timestamp, fifo, undefined)
  131. */
  132. abstract protected function supportedOrders();
  133. /**
  134. * Get the default queue order to use if configuration does not specify one
  135. *
  136. * @return string One of (random, timestamp, fifo, undefined)
  137. */
  138. abstract protected function optimalOrder();
  139. /**
  140. * Find out if delayed jobs are supported for configuration validation
  141. *
  142. * @return bool Whether delayed jobs are supported
  143. */
  144. protected function supportsDelayedJobs() {
  145. return false; // not implemented
  146. }
  147. /**
  148. * @return bool Whether delayed jobs are enabled
  149. * @since 1.22
  150. */
  151. final public function delayedJobsEnabled() {
  152. return $this->supportsDelayedJobs();
  153. }
  154. /**
  155. * @return string|bool Read-only rational or false if r/w
  156. * @since 1.27
  157. */
  158. public function getReadOnlyReason() {
  159. return $this->readOnlyReason;
  160. }
  161. /**
  162. * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
  163. * Queue classes should use caching if they are any slower without memcached.
  164. *
  165. * If caching is used, this might return false when there are actually no jobs.
  166. * If pop() is called and returns false then it should correct the cache. Also,
  167. * calling flushCaches() first prevents this. However, this affect is typically
  168. * not distinguishable from the race condition between isEmpty() and pop().
  169. *
  170. * @return bool
  171. * @throws JobQueueError
  172. */
  173. final public function isEmpty() {
  174. $res = $this->doIsEmpty();
  175. return $res;
  176. }
  177. /**
  178. * @see JobQueue::isEmpty()
  179. * @return bool
  180. */
  181. abstract protected function doIsEmpty();
  182. /**
  183. * Get the number of available (unacquired, non-delayed) jobs in the queue.
  184. * Queue classes should use caching if they are any slower without memcached.
  185. *
  186. * If caching is used, this number might be out of date for a minute.
  187. *
  188. * @return int
  189. * @throws JobQueueError
  190. */
  191. final public function getSize() {
  192. $res = $this->doGetSize();
  193. return $res;
  194. }
  195. /**
  196. * @see JobQueue::getSize()
  197. * @return int
  198. */
  199. abstract protected function doGetSize();
  200. /**
  201. * Get the number of acquired jobs (these are temporarily out of the queue).
  202. * Queue classes should use caching if they are any slower without memcached.
  203. *
  204. * If caching is used, this number might be out of date for a minute.
  205. *
  206. * @return int
  207. * @throws JobQueueError
  208. */
  209. final public function getAcquiredCount() {
  210. $res = $this->doGetAcquiredCount();
  211. return $res;
  212. }
  213. /**
  214. * @see JobQueue::getAcquiredCount()
  215. * @return int
  216. */
  217. abstract protected function doGetAcquiredCount();
  218. /**
  219. * Get the number of delayed jobs (these are temporarily out of the queue).
  220. * Queue classes should use caching if they are any slower without memcached.
  221. *
  222. * If caching is used, this number might be out of date for a minute.
  223. *
  224. * @return int
  225. * @throws JobQueueError
  226. * @since 1.22
  227. */
  228. final public function getDelayedCount() {
  229. $res = $this->doGetDelayedCount();
  230. return $res;
  231. }
  232. /**
  233. * @see JobQueue::getDelayedCount()
  234. * @return int
  235. */
  236. protected function doGetDelayedCount() {
  237. return 0; // not implemented
  238. }
  239. /**
  240. * Get the number of acquired jobs that can no longer be attempted.
  241. * Queue classes should use caching if they are any slower without memcached.
  242. *
  243. * If caching is used, this number might be out of date for a minute.
  244. *
  245. * @return int
  246. * @throws JobQueueError
  247. */
  248. final public function getAbandonedCount() {
  249. $res = $this->doGetAbandonedCount();
  250. return $res;
  251. }
  252. /**
  253. * @see JobQueue::getAbandonedCount()
  254. * @return int
  255. */
  256. protected function doGetAbandonedCount() {
  257. return 0; // not implemented
  258. }
  259. /**
  260. * Push one or more jobs into the queue.
  261. * This does not require $wgJobClasses to be set for the given job type.
  262. * Outside callers should use JobQueueGroup::push() instead of this function.
  263. *
  264. * @param IJobSpecification|IJobSpecification[] $jobs
  265. * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
  266. * @return void
  267. * @throws JobQueueError
  268. */
  269. final public function push( $jobs, $flags = 0 ) {
  270. $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
  271. $this->batchPush( $jobs, $flags );
  272. }
  273. /**
  274. * Push a batch of jobs into the queue.
  275. * This does not require $wgJobClasses to be set for the given job type.
  276. * Outside callers should use JobQueueGroup::push() instead of this function.
  277. *
  278. * @param IJobSpecification[] $jobs
  279. * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
  280. * @return void
  281. * @throws MWException
  282. */
  283. final public function batchPush( array $jobs, $flags = 0 ) {
  284. $this->assertNotReadOnly();
  285. if ( !count( $jobs ) ) {
  286. return; // nothing to do
  287. }
  288. foreach ( $jobs as $job ) {
  289. if ( $job->getType() !== $this->type ) {
  290. throw new MWException(
  291. "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
  292. } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
  293. throw new MWException(
  294. "Got delayed '{$job->getType()}' job; delays are not supported." );
  295. }
  296. }
  297. $this->doBatchPush( $jobs, $flags );
  298. $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
  299. foreach ( $jobs as $job ) {
  300. if ( $job->isRootJob() ) {
  301. $this->deduplicateRootJob( $job );
  302. }
  303. }
  304. }
  305. /**
  306. * @see JobQueue::batchPush()
  307. * @param IJobSpecification[] $jobs
  308. * @param int $flags
  309. */
  310. abstract protected function doBatchPush( array $jobs, $flags );
  311. /**
  312. * Pop a job off of the queue.
  313. * This requires $wgJobClasses to be set for the given job type.
  314. * Outside callers should use JobQueueGroup::pop() instead of this function.
  315. *
  316. * @throws MWException
  317. * @return Job|bool Returns false if there are no jobs
  318. */
  319. final public function pop() {
  320. global $wgJobClasses;
  321. $this->assertNotReadOnly();
  322. if ( $this->wiki !== wfWikiID() ) {
  323. throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
  324. } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
  325. // Do not pop jobs if there is no class for the queue type
  326. throw new MWException( "Unrecognized job type '{$this->type}'." );
  327. }
  328. $job = $this->doPop();
  329. if ( !$job ) {
  330. $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
  331. }
  332. // Flag this job as an old duplicate based on its "root" job...
  333. try {
  334. if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
  335. self::incrStats( 'dupe_pops', $this->type );
  336. $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
  337. }
  338. } catch ( Exception $e ) {
  339. // don't lose jobs over this
  340. }
  341. return $job;
  342. }
  343. /**
  344. * @see JobQueue::pop()
  345. * @return Job|bool
  346. */
  347. abstract protected function doPop();
  348. /**
  349. * Acknowledge that a job was completed.
  350. *
  351. * This does nothing for certain queue classes or if "claimTTL" is not set.
  352. * Outside callers should use JobQueueGroup::ack() instead of this function.
  353. *
  354. * @param Job $job
  355. * @return void
  356. * @throws MWException
  357. */
  358. final public function ack( Job $job ) {
  359. $this->assertNotReadOnly();
  360. if ( $job->getType() !== $this->type ) {
  361. throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
  362. }
  363. $this->doAck( $job );
  364. }
  365. /**
  366. * @see JobQueue::ack()
  367. * @param Job $job
  368. */
  369. abstract protected function doAck( Job $job );
  370. /**
  371. * Register the "root job" of a given job into the queue for de-duplication.
  372. * This should only be called right *after* all the new jobs have been inserted.
  373. * This is used to turn older, duplicate, job entries into no-ops. The root job
  374. * information will remain in the registry until it simply falls out of cache.
  375. *
  376. * This requires that $job has two special fields in the "params" array:
  377. * - rootJobSignature : hash (e.g. SHA1) that identifies the task
  378. * - rootJobTimestamp : TS_MW timestamp of this instance of the task
  379. *
  380. * A "root job" is a conceptual job that consist of potentially many smaller jobs
  381. * that are actually inserted into the queue. For example, "refreshLinks" jobs are
  382. * spawned when a template is edited. One can think of the task as "update links
  383. * of pages that use template X" and an instance of that task as a "root job".
  384. * However, what actually goes into the queue are range and leaf job subtypes.
  385. * Since these jobs include things like page ID ranges and DB master positions,
  386. * and can morph into smaller jobs recursively, simple duplicate detection
  387. * for individual jobs being identical (like that of job_sha1) is not useful.
  388. *
  389. * In the case of "refreshLinks", if these jobs are still in the queue when the template
  390. * is edited again, we want all of these old refreshLinks jobs for that template to become
  391. * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
  392. * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
  393. * previous "root job" for the same task of "update links of pages that use template X".
  394. *
  395. * This does nothing for certain queue classes.
  396. *
  397. * @param IJobSpecification $job
  398. * @throws MWException
  399. * @return bool
  400. */
  401. final public function deduplicateRootJob( IJobSpecification $job ) {
  402. $this->assertNotReadOnly();
  403. if ( $job->getType() !== $this->type ) {
  404. throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
  405. }
  406. return $this->doDeduplicateRootJob( $job );
  407. }
  408. /**
  409. * @see JobQueue::deduplicateRootJob()
  410. * @param IJobSpecification $job
  411. * @throws MWException
  412. * @return bool
  413. */
  414. protected function doDeduplicateRootJob( IJobSpecification $job ) {
  415. if ( !$job->hasRootJobParams() ) {
  416. throw new MWException( "Cannot register root job; missing parameters." );
  417. }
  418. $params = $job->getRootJobParams();
  419. $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
  420. // Callers should call JobQueueGroup::push() before this method so that if the insert
  421. // fails, the de-duplication registration will be aborted. Since the insert is
  422. // deferred till "transaction idle", do the same here, so that the ordering is
  423. // maintained. Having only the de-duplication registration succeed would cause
  424. // jobs to become no-ops without any actual jobs that made them redundant.
  425. $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
  426. if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
  427. return true; // a newer version of this root job was enqueued
  428. }
  429. // Update the timestamp of the last root job started at the location...
  430. return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
  431. }
  432. /**
  433. * Check if the "root" job of a given job has been superseded by a newer one
  434. *
  435. * @param Job $job
  436. * @throws MWException
  437. * @return bool
  438. */
  439. final protected function isRootJobOldDuplicate( Job $job ) {
  440. if ( $job->getType() !== $this->type ) {
  441. throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
  442. }
  443. $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
  444. return $isDuplicate;
  445. }
  446. /**
  447. * @see JobQueue::isRootJobOldDuplicate()
  448. * @param Job $job
  449. * @return bool
  450. */
  451. protected function doIsRootJobOldDuplicate( Job $job ) {
  452. if ( !$job->hasRootJobParams() ) {
  453. return false; // job has no de-deplication info
  454. }
  455. $params = $job->getRootJobParams();
  456. $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
  457. // Get the last time this root job was enqueued
  458. $timestamp = $this->dupCache->get( $key );
  459. // Check if a new root job was started at the location after this one's...
  460. return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
  461. }
  462. /**
  463. * @param string $signature Hash identifier of the root job
  464. * @return string
  465. */
  466. protected function getRootJobCacheKey( $signature ) {
  467. list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
  468. return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
  469. }
  470. /**
  471. * Deleted all unclaimed and delayed jobs from the queue
  472. *
  473. * @throws JobQueueError
  474. * @since 1.22
  475. * @return void
  476. */
  477. final public function delete() {
  478. $this->assertNotReadOnly();
  479. $this->doDelete();
  480. }
  481. /**
  482. * @see JobQueue::delete()
  483. * @throws MWException
  484. */
  485. protected function doDelete() {
  486. throw new MWException( "This method is not implemented." );
  487. }
  488. /**
  489. * Wait for any replica DBs or backup servers to catch up.
  490. *
  491. * This does nothing for certain queue classes.
  492. *
  493. * @return void
  494. * @throws JobQueueError
  495. */
  496. final public function waitForBackups() {
  497. $this->doWaitForBackups();
  498. }
  499. /**
  500. * @see JobQueue::waitForBackups()
  501. * @return void
  502. */
  503. protected function doWaitForBackups() {
  504. }
  505. /**
  506. * Clear any process and persistent caches
  507. *
  508. * @return void
  509. */
  510. final public function flushCaches() {
  511. $this->doFlushCaches();
  512. }
  513. /**
  514. * @see JobQueue::flushCaches()
  515. * @return void
  516. */
  517. protected function doFlushCaches() {
  518. }
  519. /**
  520. * Get an iterator to traverse over all available jobs in this queue.
  521. * This does not include jobs that are currently acquired or delayed.
  522. * Note: results may be stale if the queue is concurrently modified.
  523. *
  524. * @return Iterator
  525. * @throws JobQueueError
  526. */
  527. abstract public function getAllQueuedJobs();
  528. /**
  529. * Get an iterator to traverse over all delayed jobs in this queue.
  530. * Note: results may be stale if the queue is concurrently modified.
  531. *
  532. * @return Iterator
  533. * @throws JobQueueError
  534. * @since 1.22
  535. */
  536. public function getAllDelayedJobs() {
  537. return new ArrayIterator( [] ); // not implemented
  538. }
  539. /**
  540. * Get an iterator to traverse over all claimed jobs in this queue
  541. *
  542. * Callers should be quick to iterator over it or few results
  543. * will be returned due to jobs being acknowledged and deleted
  544. *
  545. * @return Iterator
  546. * @throws JobQueueError
  547. * @since 1.26
  548. */
  549. public function getAllAcquiredJobs() {
  550. return new ArrayIterator( [] ); // not implemented
  551. }
  552. /**
  553. * Get an iterator to traverse over all abandoned jobs in this queue
  554. *
  555. * @return Iterator
  556. * @throws JobQueueError
  557. * @since 1.25
  558. */
  559. public function getAllAbandonedJobs() {
  560. return new ArrayIterator( [] ); // not implemented
  561. }
  562. /**
  563. * Do not use this function outside of JobQueue/JobQueueGroup
  564. *
  565. * @return string
  566. * @since 1.22
  567. */
  568. public function getCoalesceLocationInternal() {
  569. return null;
  570. }
  571. /**
  572. * Check whether each of the given queues are empty.
  573. * This is used for batching checks for queues stored at the same place.
  574. *
  575. * @param array $types List of queues types
  576. * @return array|null (list of non-empty queue types) or null if unsupported
  577. * @throws MWException
  578. * @since 1.22
  579. */
  580. final public function getSiblingQueuesWithJobs( array $types ) {
  581. return $this->doGetSiblingQueuesWithJobs( $types );
  582. }
  583. /**
  584. * @see JobQueue::getSiblingQueuesWithJobs()
  585. * @param array $types List of queues types
  586. * @return array|null (list of queue types) or null if unsupported
  587. */
  588. protected function doGetSiblingQueuesWithJobs( array $types ) {
  589. return null; // not supported
  590. }
  591. /**
  592. * Check the size of each of the given queues.
  593. * For queues not served by the same store as this one, 0 is returned.
  594. * This is used for batching checks for queues stored at the same place.
  595. *
  596. * @param array $types List of queues types
  597. * @return array|null (job type => whether queue is empty) or null if unsupported
  598. * @throws MWException
  599. * @since 1.22
  600. */
  601. final public function getSiblingQueueSizes( array $types ) {
  602. return $this->doGetSiblingQueueSizes( $types );
  603. }
  604. /**
  605. * @see JobQueue::getSiblingQueuesSize()
  606. * @param array $types List of queues types
  607. * @return array|null (list of queue types) or null if unsupported
  608. */
  609. protected function doGetSiblingQueueSizes( array $types ) {
  610. return null; // not supported
  611. }
  612. /**
  613. * @throws JobQueueReadOnlyError
  614. */
  615. protected function assertNotReadOnly() {
  616. if ( $this->readOnlyReason !== false ) {
  617. throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
  618. }
  619. }
  620. /**
  621. * Call wfIncrStats() for the queue overall and for the queue type
  622. *
  623. * @param string $key Event type
  624. * @param string $type Job type
  625. * @param int $delta
  626. * @since 1.22
  627. */
  628. public static function incrStats( $key, $type, $delta = 1 ) {
  629. static $stats;
  630. if ( !$stats ) {
  631. $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
  632. }
  633. $stats->updateCount( "jobqueue.{$key}.all", $delta );
  634. $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
  635. }
  636. }
  637. /**
  638. * @ingroup JobQueue
  639. * @since 1.22
  640. */
  641. class JobQueueError extends MWException {
  642. }
  643. class JobQueueConnectionError extends JobQueueError {
  644. }
  645. class JobQueueReadOnlyError extends JobQueueError {
  646. }