JobQueue.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. <?php
  2. /**
  3. * @defgroup JobQueue JobQueue
  4. */
  5. if ( !defined( 'MEDIAWIKI' ) ) {
  6. die( "This file is part of MediaWiki, it is not a valid entry point\n" );
  7. }
  8. /**
  9. * Class to both describe a background job and handle jobs.
  10. *
  11. * @ingroup JobQueue
  12. */
  13. abstract class Job {
  14. var $command,
  15. $title,
  16. $params,
  17. $id,
  18. $removeDuplicates,
  19. $error;
  20. /*-------------------------------------------------------------------------
  21. * Abstract functions
  22. *------------------------------------------------------------------------*/
  23. /**
  24. * Run the job
  25. * @return boolean success
  26. */
  27. abstract function run();
  28. /*-------------------------------------------------------------------------
  29. * Static functions
  30. *------------------------------------------------------------------------*/
  31. /**
  32. * @deprecated use LinksUpdate::queueRecursiveJobs()
  33. */
  34. /**
  35. * static function queueLinksJobs( $titles ) {}
  36. */
  37. /**
  38. * Pop a job of a certain type. This tries less hard than pop() to
  39. * actually find a job; it may be adversely affected by concurrent job
  40. * runners.
  41. */
  42. static function pop_type($type) {
  43. wfProfilein( __METHOD__ );
  44. $dbw = wfGetDB( DB_MASTER );
  45. $row = $dbw->selectRow( 'job', '*', array( 'job_cmd' => $type ), __METHOD__,
  46. array( 'LIMIT' => 1 ));
  47. if ($row === false) {
  48. wfProfileOut( __METHOD__ );
  49. return false;
  50. }
  51. /* Ensure we "own" this row */
  52. $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
  53. $affected = $dbw->affectedRows();
  54. if ($affected == 0) {
  55. wfProfileOut( __METHOD__ );
  56. return false;
  57. }
  58. $namespace = $row->job_namespace;
  59. $dbkey = $row->job_title;
  60. $title = Title::makeTitleSafe( $namespace, $dbkey );
  61. $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
  62. $dbw->delete( 'job', $job->insertFields(), __METHOD__ );
  63. $dbw->immediateCommit();
  64. wfProfileOut( __METHOD__ );
  65. return $job;
  66. }
  67. /**
  68. * Pop a job off the front of the queue
  69. *
  70. * @param $offset Number of jobs to skip
  71. * @return Job or false if there's no jobs
  72. */
  73. static function pop($offset=0) {
  74. wfProfileIn( __METHOD__ );
  75. $dbr = wfGetDB( DB_SLAVE );
  76. /* Get a job from the slave, start with an offset,
  77. scan full set afterwards, avoid hitting purged rows
  78. NB: If random fetch previously was used, offset
  79. will always be ahead of few entries
  80. */
  81. $row = $dbr->selectRow( 'job', '*', "job_id >= ${offset}", __METHOD__,
  82. array( 'ORDER BY' => 'job_id', 'LIMIT' => 1 ));
  83. // Refetching without offset is needed as some of job IDs could have had delayed commits
  84. // and have lower IDs than jobs already executed, blame concurrency :)
  85. //
  86. if ( $row === false) {
  87. if ($offset!=0)
  88. $row = $dbr->selectRow( 'job', '*', '', __METHOD__,
  89. array( 'ORDER BY' => 'job_id', 'LIMIT' => 1 ));
  90. if ($row === false ) {
  91. wfProfileOut( __METHOD__ );
  92. return false;
  93. }
  94. }
  95. $offset = $row->job_id;
  96. // Try to delete it from the master
  97. $dbw = wfGetDB( DB_MASTER );
  98. $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
  99. $affected = $dbw->affectedRows();
  100. $dbw->immediateCommit();
  101. if ( !$affected ) {
  102. // Failed, someone else beat us to it
  103. // Try getting a random row
  104. $row = $dbw->selectRow( 'job', array( 'MIN(job_id) as minjob',
  105. 'MAX(job_id) as maxjob' ), '1=1', __METHOD__ );
  106. if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) {
  107. // No jobs to get
  108. wfProfileOut( __METHOD__ );
  109. return false;
  110. }
  111. // Get the random row
  112. $row = $dbw->selectRow( 'job', '*',
  113. 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ );
  114. if ( $row === false ) {
  115. // Random job gone before we got the chance to select it
  116. // Give up
  117. wfProfileOut( __METHOD__ );
  118. return false;
  119. }
  120. // Delete the random row
  121. $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
  122. $affected = $dbw->affectedRows();
  123. $dbw->immediateCommit();
  124. if ( !$affected ) {
  125. // Random job gone before we exclusively deleted it
  126. // Give up
  127. wfProfileOut( __METHOD__ );
  128. return false;
  129. }
  130. }
  131. // If execution got to here, there's a row in $row that has been deleted from the database
  132. // by this thread. Hence the concurrent pop was successful.
  133. $namespace = $row->job_namespace;
  134. $dbkey = $row->job_title;
  135. $title = Title::makeTitleSafe( $namespace, $dbkey );
  136. $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
  137. // Remove any duplicates it may have later in the queue
  138. // Deadlock prone section
  139. $dbw->begin();
  140. $dbw->delete( 'job', $job->insertFields(), __METHOD__ );
  141. $dbw->commit();
  142. wfProfileOut( __METHOD__ );
  143. return $job;
  144. }
  145. /**
  146. * Create the appropriate object to handle a specific job
  147. *
  148. * @param $command String: Job command
  149. * @param $title Title: Associated title
  150. * @param $params Array: Job parameters
  151. * @param $id Int: Job identifier
  152. * @return Job
  153. */
  154. static function factory( $command, $title, $params = false, $id = 0 ) {
  155. global $wgJobClasses;
  156. if( isset( $wgJobClasses[$command] ) ) {
  157. $class = $wgJobClasses[$command];
  158. return new $class( $title, $params, $id );
  159. }
  160. throw new MWException( "Invalid job command `{$command}`" );
  161. }
  162. static function makeBlob( $params ) {
  163. if ( $params !== false ) {
  164. return serialize( $params );
  165. } else {
  166. return '';
  167. }
  168. }
  169. static function extractBlob( $blob ) {
  170. if ( (string)$blob !== '' ) {
  171. return unserialize( $blob );
  172. } else {
  173. return false;
  174. }
  175. }
  176. /**
  177. * Batch-insert a group of jobs into the queue.
  178. * This will be wrapped in a transaction with a forced commit.
  179. *
  180. * This may add duplicate at insert time, but they will be
  181. * removed later on, when the first one is popped.
  182. *
  183. * @param $jobs array of Job objects
  184. */
  185. static function batchInsert( $jobs ) {
  186. if( !count( $jobs ) ) {
  187. return;
  188. }
  189. $dbw = wfGetDB( DB_MASTER );
  190. $rows = array();
  191. foreach( $jobs as $job ) {
  192. $rows[] = $job->insertFields();
  193. if ( count( $rows ) >= 50 ) {
  194. # Do a small transaction to avoid slave lag
  195. $dbw->begin();
  196. $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
  197. $dbw->commit();
  198. $rows = array();
  199. }
  200. }
  201. if ( $rows ) {
  202. $dbw->begin();
  203. $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
  204. $dbw->commit();
  205. }
  206. }
  207. /*-------------------------------------------------------------------------
  208. * Non-static functions
  209. *------------------------------------------------------------------------*/
  210. function __construct( $command, $title, $params = false, $id = 0 ) {
  211. $this->command = $command;
  212. $this->title = $title;
  213. $this->params = $params;
  214. $this->id = $id;
  215. // A bit of premature generalisation
  216. // Oh well, the whole class is premature generalisation really
  217. $this->removeDuplicates = true;
  218. }
  219. /**
  220. * Insert a single job into the queue.
  221. */
  222. function insert() {
  223. $fields = $this->insertFields();
  224. $dbw = wfGetDB( DB_MASTER );
  225. if ( $this->removeDuplicates ) {
  226. $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ );
  227. if ( $dbw->numRows( $res ) ) {
  228. return;
  229. }
  230. }
  231. $fields['job_id'] = $dbw->nextSequenceValue( 'job_job_id_seq' );
  232. $dbw->insert( 'job', $fields, __METHOD__ );
  233. }
  234. protected function insertFields() {
  235. return array(
  236. 'job_cmd' => $this->command,
  237. 'job_namespace' => $this->title->getNamespace(),
  238. 'job_title' => $this->title->getDBkey(),
  239. 'job_params' => Job::makeBlob( $this->params )
  240. );
  241. }
  242. function toString() {
  243. $paramString = '';
  244. if ( $this->params ) {
  245. foreach ( $this->params as $key => $value ) {
  246. if ( $paramString != '' ) {
  247. $paramString .= ' ';
  248. }
  249. $paramString .= "$key=$value";
  250. }
  251. }
  252. if ( is_object( $this->title ) ) {
  253. $s = "{$this->command} " . $this->title->getPrefixedDBkey();
  254. if ( $paramString !== '' ) {
  255. $s .= ' ' . $paramString;
  256. }
  257. return $s;
  258. } else {
  259. return "{$this->command} $paramString";
  260. }
  261. }
  262. protected function setLastError( $error ) {
  263. $this->error = $error;
  264. }
  265. function getLastError() {
  266. return $this->error;
  267. }
  268. }