LoadBalancer.php 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916
  1. <?php
  2. /**
  3. * @file
  4. * @ingroup Database
  5. */
  6. /**
  7. * Database load balancing object
  8. *
  9. * @todo document
  10. * @ingroup Database
  11. */
  12. class LoadBalancer {
  13. /* private */ var $mServers, $mConns, $mLoads, $mGroupLoads;
  14. /* private */ var $mFailFunction, $mErrorConnection;
  15. /* private */ var $mReadIndex, $mAllowLagged;
  16. /* private */ var $mWaitForPos, $mWaitTimeout;
  17. /* private */ var $mLaggedSlaveMode, $mLastError = 'Unknown error';
  18. /* private */ var $mParentInfo, $mLagTimes;
  19. /* private */ var $mLoadMonitorClass, $mLoadMonitor;
  20. /**
  21. * @param array $params Array with keys:
  22. * servers Required. Array of server info structures.
  23. * failFunction Deprecated, use exceptions instead.
  24. * masterWaitTimeout Replication lag wait timeout
  25. * loadMonitor Name of a class used to fetch server lag and load.
  26. */
  27. function __construct( $params )
  28. {
  29. if ( !isset( $params['servers'] ) ) {
  30. throw new MWException( __CLASS__.': missing servers parameter' );
  31. }
  32. $this->mServers = $params['servers'];
  33. if ( isset( $params['failFunction'] ) ) {
  34. $this->mFailFunction = $params['failFunction'];
  35. } else {
  36. $this->mFailFunction = false;
  37. }
  38. if ( isset( $params['waitTimeout'] ) ) {
  39. $this->mWaitTimeout = $params['waitTimeout'];
  40. } else {
  41. $this->mWaitTimeout = 10;
  42. }
  43. $this->mReadIndex = -1;
  44. $this->mWriteIndex = -1;
  45. $this->mConns = array(
  46. 'local' => array(),
  47. 'foreignUsed' => array(),
  48. 'foreignFree' => array() );
  49. $this->mLoads = array();
  50. $this->mWaitForPos = false;
  51. $this->mLaggedSlaveMode = false;
  52. $this->mErrorConnection = false;
  53. $this->mAllowLag = false;
  54. $this->mLoadMonitorClass = isset( $params['loadMonitor'] )
  55. ? $params['loadMonitor'] : 'LoadMonitor_MySQL';
  56. foreach( $params['servers'] as $i => $server ) {
  57. $this->mLoads[$i] = $server['load'];
  58. if ( isset( $server['groupLoads'] ) ) {
  59. foreach ( $server['groupLoads'] as $group => $ratio ) {
  60. if ( !isset( $this->mGroupLoads[$group] ) ) {
  61. $this->mGroupLoads[$group] = array();
  62. }
  63. $this->mGroupLoads[$group][$i] = $ratio;
  64. }
  65. }
  66. }
  67. }
  68. static function newFromParams( $servers, $failFunction = false, $waitTimeout = 10 )
  69. {
  70. return new LoadBalancer( $servers, $failFunction, $waitTimeout );
  71. }
  72. /**
  73. * Get a LoadMonitor instance
  74. */
  75. function getLoadMonitor() {
  76. if ( !isset( $this->mLoadMonitor ) ) {
  77. $class = $this->mLoadMonitorClass;
  78. $this->mLoadMonitor = new $class( $this );
  79. }
  80. return $this->mLoadMonitor;
  81. }
  82. /**
  83. * Get or set arbitrary data used by the parent object, usually an LBFactory
  84. */
  85. function parentInfo( $x = null ) {
  86. return wfSetVar( $this->mParentInfo, $x );
  87. }
  88. /**
  89. * Given an array of non-normalised probabilities, this function will select
  90. * an element and return the appropriate key
  91. */
  92. function pickRandom( $weights )
  93. {
  94. if ( !is_array( $weights ) || count( $weights ) == 0 ) {
  95. return false;
  96. }
  97. $sum = array_sum( $weights );
  98. if ( $sum == 0 ) {
  99. # No loads on any of them
  100. # In previous versions, this triggered an unweighted random selection,
  101. # but this feature has been removed as of April 2006 to allow for strict
  102. # separation of query groups.
  103. return false;
  104. }
  105. $max = mt_getrandmax();
  106. $rand = mt_rand(0, $max) / $max * $sum;
  107. $sum = 0;
  108. foreach ( $weights as $i => $w ) {
  109. $sum += $w;
  110. if ( $sum >= $rand ) {
  111. break;
  112. }
  113. }
  114. return $i;
  115. }
  116. function getRandomNonLagged( $loads, $wiki = false ) {
  117. # Unset excessively lagged servers
  118. $lags = $this->getLagTimes( $wiki );
  119. foreach ( $lags as $i => $lag ) {
  120. if ( $i != 0 && isset( $this->mServers[$i]['max lag'] ) ) {
  121. if ( $lag === false ) {
  122. wfDebug( "Server #$i is not replicating\n" );
  123. unset( $loads[$i] );
  124. } elseif ( $lag > $this->mServers[$i]['max lag'] ) {
  125. wfDebug( "Server #$i is excessively lagged ($lag seconds)\n" );
  126. unset( $loads[$i] );
  127. }
  128. }
  129. }
  130. # Find out if all the slaves with non-zero load are lagged
  131. $sum = 0;
  132. foreach ( $loads as $load ) {
  133. $sum += $load;
  134. }
  135. if ( $sum == 0 ) {
  136. # No appropriate DB servers except maybe the master and some slaves with zero load
  137. # Do NOT use the master
  138. # Instead, this function will return false, triggering read-only mode,
  139. # and a lagged slave will be used instead.
  140. return false;
  141. }
  142. if ( count( $loads ) == 0 ) {
  143. return false;
  144. }
  145. #wfDebugLog( 'connect', var_export( $loads, true ) );
  146. # Return a random representative of the remainder
  147. return $this->pickRandom( $loads );
  148. }
  149. /**
  150. * Get the index of the reader connection, which may be a slave
  151. * This takes into account load ratios and lag times. It should
  152. * always return a consistent index during a given invocation
  153. *
  154. * Side effect: opens connections to databases
  155. */
  156. function getReaderIndex( $group = false, $wiki = false ) {
  157. global $wgReadOnly, $wgDBClusterTimeout, $wgDBAvgStatusPoll, $wgDBtype;
  158. # FIXME: For now, only go through all this for mysql databases
  159. if ($wgDBtype != 'mysql') {
  160. return $this->getWriterIndex();
  161. }
  162. if ( count( $this->mServers ) == 1 ) {
  163. # Skip the load balancing if there's only one server
  164. return 0;
  165. } elseif ( $group === false and $this->mReadIndex >= 0 ) {
  166. # Shortcut if generic reader exists already
  167. return $this->mReadIndex;
  168. }
  169. wfProfileIn( __METHOD__ );
  170. $totalElapsed = 0;
  171. # convert from seconds to microseconds
  172. $timeout = $wgDBClusterTimeout * 1e6;
  173. # Find the relevant load array
  174. if ( $group !== false ) {
  175. if ( isset( $this->mGroupLoads[$group] ) ) {
  176. $nonErrorLoads = $this->mGroupLoads[$group];
  177. } else {
  178. # No loads for this group, return false and the caller can use some other group
  179. wfDebug( __METHOD__.": no loads for group $group\n" );
  180. wfProfileOut( __METHOD__ );
  181. return false;
  182. }
  183. } else {
  184. $nonErrorLoads = $this->mLoads;
  185. }
  186. if ( !$nonErrorLoads ) {
  187. throw new MWException( "Empty server array given to LoadBalancer" );
  188. }
  189. # Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
  190. $this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
  191. $i = false;
  192. $found = false;
  193. $laggedSlaveMode = false;
  194. # First try quickly looking through the available servers for a server that
  195. # meets our criteria
  196. do {
  197. $totalThreadsConnected = 0;
  198. $overloadedServers = 0;
  199. $currentLoads = $nonErrorLoads;
  200. while ( count( $currentLoads ) ) {
  201. if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) {
  202. $i = $this->pickRandom( $currentLoads );
  203. } else {
  204. $i = $this->getRandomNonLagged( $currentLoads, $wiki );
  205. if ( $i === false && count( $currentLoads ) != 0 ) {
  206. # All slaves lagged. Switch to read-only mode
  207. $wgReadOnly = wfMsgNoDBForContent( 'readonly_lag' );
  208. $i = $this->pickRandom( $currentLoads );
  209. $laggedSlaveMode = true;
  210. }
  211. }
  212. if ( $i === false ) {
  213. # pickRandom() returned false
  214. # This is permanent and means the configuration or the load monitor
  215. # wants us to return false.
  216. wfDebugLog( 'connect', __METHOD__.": pickRandom() returned false\n" );
  217. wfProfileOut( __METHOD__ );
  218. return false;
  219. }
  220. wfDebugLog( 'connect', __METHOD__.": Using reader #$i: {$this->mServers[$i]['host']}...\n" );
  221. $conn = $this->openConnection( $i, $wiki );
  222. if ( !$conn ) {
  223. wfDebugLog( 'connect', __METHOD__.": Failed connecting to $i/$wiki\n" );
  224. unset( $nonErrorLoads[$i] );
  225. unset( $currentLoads[$i] );
  226. continue;
  227. }
  228. // Perform post-connection backoff
  229. $threshold = isset( $this->mServers[$i]['max threads'] )
  230. ? $this->mServers[$i]['max threads'] : false;
  231. $backoff = $this->getLoadMonitor()->postConnectionBackoff( $conn, $threshold );
  232. // Decrement reference counter, we are finished with this connection.
  233. // It will be incremented for the caller later.
  234. if ( $wiki !== false ) {
  235. $this->reuseConnection( $conn );
  236. }
  237. if ( $backoff ) {
  238. # Post-connection overload, don't use this server for now
  239. $totalThreadsConnected += $backoff;
  240. $overloadedServers++;
  241. unset( $currentLoads[$i] );
  242. } else {
  243. # Return this server
  244. break 2;
  245. }
  246. }
  247. # No server found yet
  248. $i = false;
  249. # If all servers were down, quit now
  250. if ( !count( $nonErrorLoads ) ) {
  251. wfDebugLog( 'connect', "All servers down\n" );
  252. break;
  253. }
  254. # Some servers must have been overloaded
  255. if ( $overloadedServers == 0 ) {
  256. throw new MWException( __METHOD__.": unexpectedly found no overloaded servers" );
  257. }
  258. # Back off for a while
  259. # Scale the sleep time by the number of connected threads, to produce a
  260. # roughly constant global poll rate
  261. $avgThreads = $totalThreadsConnected / $overloadedServers;
  262. $totalElapsed += $this->sleep( $wgDBAvgStatusPoll * $avgThreads );
  263. } while ( $totalElapsed < $timeout );
  264. if ( $totalElapsed >= $timeout ) {
  265. wfDebugLog( 'connect', "All servers busy\n" );
  266. $this->mErrorConnection = false;
  267. $this->mLastError = 'All servers busy';
  268. }
  269. if ( $i !== false ) {
  270. # Slave connection successful
  271. # Wait for the session master pos for a short time
  272. if ( $this->mWaitForPos && $i > 0 ) {
  273. if ( !$this->doWait( $i ) ) {
  274. $this->mServers[$i]['slave pos'] = $conn->getSlavePos();
  275. }
  276. }
  277. if ( $this->mReadIndex <=0 && $this->mLoads[$i]>0 && $i !== false ) {
  278. $this->mReadIndex = $i;
  279. }
  280. }
  281. wfProfileOut( __METHOD__ );
  282. return $i;
  283. }
  284. /**
  285. * Wait for a specified number of microseconds, and return the period waited
  286. */
  287. function sleep( $t ) {
  288. wfProfileIn( __METHOD__ );
  289. wfDebug( __METHOD__.": waiting $t us\n" );
  290. usleep( $t );
  291. wfProfileOut( __METHOD__ );
  292. return $t;
  293. }
  294. /**
  295. * Get a random server to use in a query group
  296. * @deprecated use getReaderIndex
  297. */
  298. function getGroupIndex( $group ) {
  299. return $this->getReaderIndex( $group );
  300. }
  301. /**
  302. * Set the master wait position
  303. * If a DB_SLAVE connection has been opened already, waits
  304. * Otherwise sets a variable telling it to wait if such a connection is opened
  305. */
  306. public function waitFor( $pos ) {
  307. wfProfileIn( __METHOD__ );
  308. $this->mWaitForPos = $pos;
  309. $i = $this->mReadIndex;
  310. if ( $i > 0 ) {
  311. if ( !$this->doWait( $i ) ) {
  312. $this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos();
  313. $this->mLaggedSlaveMode = true;
  314. }
  315. }
  316. wfProfileOut( __METHOD__ );
  317. }
  318. /**
  319. * Get any open connection to a given server index, local or foreign
  320. * Returns false if there is no connection open
  321. */
  322. function getAnyOpenConnection( $i ) {
  323. foreach ( $this->mConns as $type => $conns ) {
  324. if ( !empty( $conns[$i] ) ) {
  325. return reset( $conns[$i] );
  326. }
  327. }
  328. return false;
  329. }
  330. /**
  331. * Wait for a given slave to catch up to the master pos stored in $this
  332. */
  333. function doWait( $index ) {
  334. # Find a connection to wait on
  335. $conn = $this->getAnyOpenConnection( $index );
  336. if ( !$conn ) {
  337. wfDebug( __METHOD__ . ": no connection open\n" );
  338. return false;
  339. }
  340. wfDebug( __METHOD__.": Waiting for slave #$index to catch up...\n" );
  341. $result = $conn->masterPosWait( $this->mWaitForPos, $this->mWaitTimeout );
  342. if ( $result == -1 || is_null( $result ) ) {
  343. # Timed out waiting for slave, use master instead
  344. wfDebug( __METHOD__.": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" );
  345. return false;
  346. } else {
  347. wfDebug( __METHOD__.": Done\n" );
  348. return true;
  349. }
  350. }
  351. /**
  352. * Get a connection by index
  353. * This is the main entry point for this class.
  354. * @param int $i Database
  355. * @param array $groups Query groups
  356. * @param string $wiki Wiki ID
  357. */
  358. public function &getConnection( $i, $groups = array(), $wiki = false ) {
  359. global $wgDBtype;
  360. wfProfileIn( __METHOD__ );
  361. if ( $i == DB_LAST ) {
  362. throw new MWException( 'Attempt to call ' . __METHOD__ . ' with deprecated server index DB_LAST' );
  363. } elseif ( $i === null || $i === false ) {
  364. throw new MWException( 'Attempt to call ' . __METHOD__ . ' with invalid server index' );
  365. }
  366. if ( $wiki === wfWikiID() ) {
  367. $wiki = false;
  368. }
  369. # Query groups
  370. if ( $i == DB_MASTER ) {
  371. $i = $this->getWriterIndex();
  372. } elseif ( !is_array( $groups ) ) {
  373. $groupIndex = $this->getReaderIndex( $groups, $wiki );
  374. if ( $groupIndex !== false ) {
  375. $serverName = $this->getServerName( $groupIndex );
  376. wfDebug( __METHOD__.": using server $serverName for group $groups\n" );
  377. $i = $groupIndex;
  378. }
  379. } else {
  380. foreach ( $groups as $group ) {
  381. $groupIndex = $this->getReaderIndex( $group, $wiki );
  382. if ( $groupIndex !== false ) {
  383. $serverName = $this->getServerName( $groupIndex );
  384. wfDebug( __METHOD__.": using server $serverName for group $group\n" );
  385. $i = $groupIndex;
  386. break;
  387. }
  388. }
  389. }
  390. # Operation-based index
  391. if ( $i == DB_SLAVE ) {
  392. $i = $this->getReaderIndex( false, $wiki );
  393. # Couldn't find a working server in getReaderIndex()?
  394. if ( $i === false ) {
  395. $this->mLastError = 'No working slave server: ' . $this->mLastError;
  396. $this->reportConnectionError( $this->mErrorConnection );
  397. return false;
  398. }
  399. }
  400. # Now we have an explicit index into the servers array
  401. $conn = $this->openConnection( $i, $wiki );
  402. if ( !$conn ) {
  403. $this->reportConnectionError( $this->mErrorConnection );
  404. }
  405. wfProfileOut( __METHOD__ );
  406. return $conn;
  407. }
  408. /**
  409. * Mark a foreign connection as being available for reuse under a different
  410. * DB name or prefix. This mechanism is reference-counted, and must be called
  411. * the same number of times as getConnection() to work.
  412. */
  413. public function reuseConnection( $conn ) {
  414. $serverIndex = $conn->getLBInfo('serverIndex');
  415. $refCount = $conn->getLBInfo('foreignPoolRefCount');
  416. $dbName = $conn->getDBname();
  417. $prefix = $conn->tablePrefix();
  418. if ( strval( $prefix ) !== '' ) {
  419. $wiki = "$dbName-$prefix";
  420. } else {
  421. $wiki = $dbName;
  422. }
  423. if ( $serverIndex === null || $refCount === null ) {
  424. wfDebug( __METHOD__.": this connection was not opened as a foreign connection\n" );
  425. /**
  426. * This can happen in code like:
  427. * foreach ( $dbs as $db ) {
  428. * $conn = $lb->getConnection( DB_SLAVE, array(), $db );
  429. * ...
  430. * $lb->reuseConnection( $conn );
  431. * }
  432. * When a connection to the local DB is opened in this way, reuseConnection()
  433. * should be ignored
  434. */
  435. return;
  436. }
  437. if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) {
  438. throw new MWException( __METHOD__.": connection not found, has the connection been freed already?" );
  439. }
  440. $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
  441. if ( $refCount <= 0 ) {
  442. $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn;
  443. unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] );
  444. wfDebug( __METHOD__.": freed connection $serverIndex/$wiki\n" );
  445. } else {
  446. wfDebug( __METHOD__.": reference count for $serverIndex/$wiki reduced to $refCount\n" );
  447. }
  448. }
  449. /**
  450. * Open a connection to the server given by the specified index
  451. * Index must be an actual index into the array.
  452. * If the server is already open, returns it.
  453. *
  454. * On error, returns false, and the connection which caused the
  455. * error will be available via $this->mErrorConnection.
  456. *
  457. * @param integer $i Server index
  458. * @param string $wiki Wiki ID to open
  459. * @return Database
  460. *
  461. * @access private
  462. */
  463. function openConnection( $i, $wiki = false ) {
  464. wfProfileIn( __METHOD__ );
  465. if ( $wiki !== false ) {
  466. $conn = $this->openForeignConnection( $i, $wiki );
  467. wfProfileOut( __METHOD__);
  468. return $conn;
  469. }
  470. if ( isset( $this->mConns['local'][$i][0] ) ) {
  471. $conn = $this->mConns['local'][$i][0];
  472. } else {
  473. $server = $this->mServers[$i];
  474. $server['serverIndex'] = $i;
  475. $conn = $this->reallyOpenConnection( $server, false );
  476. if ( $conn->isOpen() ) {
  477. $this->mConns['local'][$i][0] = $conn;
  478. } else {
  479. wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" );
  480. $this->mErrorConnection = $conn;
  481. $conn = false;
  482. }
  483. }
  484. wfProfileOut( __METHOD__ );
  485. return $conn;
  486. }
  487. /**
  488. * Open a connection to a foreign DB, or return one if it is already open.
  489. *
  490. * Increments a reference count on the returned connection which locks the
  491. * connection to the requested wiki. This reference count can be
  492. * decremented by calling reuseConnection().
  493. *
  494. * If a connection is open to the appropriate server already, but with the wrong
  495. * database, it will be switched to the right database and returned, as long as
  496. * it has been freed first with reuseConnection().
  497. *
  498. * On error, returns false, and the connection which caused the
  499. * error will be available via $this->mErrorConnection.
  500. *
  501. * @param integer $i Server index
  502. * @param string $wiki Wiki ID to open
  503. * @return Database
  504. */
  505. function openForeignConnection( $i, $wiki ) {
  506. wfProfileIn(__METHOD__);
  507. list( $dbName, $prefix ) = wfSplitWikiID( $wiki );
  508. if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) {
  509. // Reuse an already-used connection
  510. $conn = $this->mConns['foreignUsed'][$i][$wiki];
  511. wfDebug( __METHOD__.": reusing connection $i/$wiki\n" );
  512. } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) {
  513. // Reuse a free connection for the same wiki
  514. $conn = $this->mConns['foreignFree'][$i][$wiki];
  515. unset( $this->mConns['foreignFree'][$i][$wiki] );
  516. $this->mConns['foreignUsed'][$i][$wiki] = $conn;
  517. wfDebug( __METHOD__.": reusing free connection $i/$wiki\n" );
  518. } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
  519. // Reuse a connection from another wiki
  520. $conn = reset( $this->mConns['foreignFree'][$i] );
  521. $oldWiki = key( $this->mConns['foreignFree'][$i] );
  522. if ( !$conn->selectDB( $dbName ) ) {
  523. $this->mLastError = "Error selecting database $dbName on server " .
  524. $conn->getServer() . " from client host " . wfHostname() . "\n";
  525. $this->mErrorConnection = $conn;
  526. $conn = false;
  527. } else {
  528. $conn->tablePrefix( $prefix );
  529. unset( $this->mConns['foreignFree'][$i][$oldWiki] );
  530. $this->mConns['foreignUsed'][$i][$wiki] = $conn;
  531. wfDebug( __METHOD__.": reusing free connection from $oldWiki for $wiki\n" );
  532. }
  533. } else {
  534. // Open a new connection
  535. $server = $this->mServers[$i];
  536. $server['serverIndex'] = $i;
  537. $server['foreignPoolRefCount'] = 0;
  538. $conn = $this->reallyOpenConnection( $server, $dbName );
  539. if ( !$conn->isOpen() ) {
  540. wfDebug( __METHOD__.": error opening connection for $i/$wiki\n" );
  541. $this->mErrorConnection = $conn;
  542. $conn = false;
  543. } else {
  544. $conn->tablePrefix( $prefix );
  545. $this->mConns['foreignUsed'][$i][$wiki] = $conn;
  546. wfDebug( __METHOD__.": opened new connection for $i/$wiki\n" );
  547. }
  548. }
  549. // Increment reference count
  550. if ( $conn ) {
  551. $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
  552. $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
  553. }
  554. wfProfileOut(__METHOD__);
  555. return $conn;
  556. }
  557. /**
  558. * Test if the specified index represents an open connection
  559. * @access private
  560. */
  561. function isOpen( $index ) {
  562. if( !is_integer( $index ) ) {
  563. return false;
  564. }
  565. return (bool)$this->getAnyOpenConnection( $index );
  566. }
  567. /**
  568. * Really opens a connection. Uncached.
  569. * Returns a Database object whether or not the connection was successful.
  570. * @access private
  571. */
  572. function reallyOpenConnection( $server, $dbNameOverride = false ) {
  573. if( !is_array( $server ) ) {
  574. throw new MWException( 'You must update your load-balancing configuration. See DefaultSettings.php entry for $wgDBservers.' );
  575. }
  576. extract( $server );
  577. if ( $dbNameOverride !== false ) {
  578. $dbname = $dbNameOverride;
  579. }
  580. # Get class for this database type
  581. $class = 'Database' . ucfirst( $type );
  582. # Create object
  583. wfDebug( "Connecting to $host $dbname...\n" );
  584. $db = new $class( $host, $user, $password, $dbname, 1, $flags );
  585. if ( $db->isOpen() ) {
  586. wfDebug( "Connected\n" );
  587. } else {
  588. wfDebug( "Failed\n" );
  589. }
  590. $db->setLBInfo( $server );
  591. if ( isset( $server['fakeSlaveLag'] ) ) {
  592. $db->setFakeSlaveLag( $server['fakeSlaveLag'] );
  593. }
  594. if ( isset( $server['fakeMaster'] ) ) {
  595. $db->setFakeMaster( true );
  596. }
  597. return $db;
  598. }
  599. function reportConnectionError( &$conn ) {
  600. wfProfileIn( __METHOD__ );
  601. if ( !is_object( $conn ) ) {
  602. // No last connection, probably due to all servers being too busy
  603. wfLogDBError( "LB failure with no last connection\n" );
  604. $conn = new Database;
  605. if ( $this->mFailFunction ) {
  606. $conn->failFunction( $this->mFailFunction );
  607. $conn->reportConnectionError( $this->mLastError );
  608. } else {
  609. // If all servers were busy, mLastError will contain something sensible
  610. throw new DBConnectionError( $conn, $this->mLastError );
  611. }
  612. } else {
  613. if ( $this->mFailFunction ) {
  614. $conn->failFunction( $this->mFailFunction );
  615. } else {
  616. $conn->failFunction( false );
  617. }
  618. $server = $conn->getProperty( 'mServer' );
  619. wfLogDBError( "Connection error: {$this->mLastError} ({$server})\n" );
  620. $conn->reportConnectionError( "{$this->mLastError} ({$server})" );
  621. }
  622. wfProfileOut( __METHOD__ );
  623. }
  624. function getWriterIndex() {
  625. return 0;
  626. }
  627. /**
  628. * Returns true if the specified index is a valid server index
  629. */
  630. function haveIndex( $i ) {
  631. return array_key_exists( $i, $this->mServers );
  632. }
  633. /**
  634. * Returns true if the specified index is valid and has non-zero load
  635. */
  636. function isNonZeroLoad( $i ) {
  637. return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
  638. }
  639. /**
  640. * Get the number of defined servers (not the number of open connections)
  641. */
  642. function getServerCount() {
  643. return count( $this->mServers );
  644. }
  645. /**
  646. * Get the host name or IP address of the server with the specified index
  647. * Prefer a readable name if available.
  648. */
  649. function getServerName( $i ) {
  650. if ( isset( $this->mServers[$i]['hostName'] ) ) {
  651. return $this->mServers[$i]['hostName'];
  652. } elseif ( isset( $this->mServers[$i]['host'] ) ) {
  653. return $this->mServers[$i]['host'];
  654. } else {
  655. return '';
  656. }
  657. }
  658. /**
  659. * Return the server info structure for a given index, or false if the index is invalid.
  660. */
  661. function getServerInfo( $i ) {
  662. if ( isset( $this->mServers[$i] ) ) {
  663. return $this->mServers[$i];
  664. } else {
  665. return false;
  666. }
  667. }
  668. /**
  669. * Get the current master position for chronology control purposes
  670. * @return mixed
  671. */
  672. function getMasterPos() {
  673. # If this entire request was served from a slave without opening a connection to the
  674. # master (however unlikely that may be), then we can fetch the position from the slave.
  675. $masterConn = $this->getAnyOpenConnection( 0 );
  676. if ( !$masterConn ) {
  677. for ( $i = 1; $i < count( $this->mServers ); $i++ ) {
  678. $conn = $this->getAnyOpenConnection( $i );
  679. if ( $conn ) {
  680. wfDebug( "Master pos fetched from slave\n" );
  681. return $conn->getSlavePos();
  682. }
  683. }
  684. } else {
  685. wfDebug( "Master pos fetched from master\n" );
  686. return $masterConn->getMasterPos();
  687. }
  688. return false;
  689. }
  690. /**
  691. * Close all open connections
  692. */
  693. function closeAll() {
  694. foreach ( $this->mConns as $conns2 ) {
  695. foreach ( $conns2 as $conns3 ) {
  696. foreach ( $conns3 as $conn ) {
  697. $conn->close();
  698. }
  699. }
  700. }
  701. $this->mConns = array(
  702. 'local' => array(),
  703. 'foreignFree' => array(),
  704. 'foreignUsed' => array(),
  705. );
  706. }
  707. /**
  708. * Close a connection
  709. * Using this function makes sure the LoadBalancer knows the connection is closed.
  710. * If you use $conn->close() directly, the load balancer won't update its state.
  711. */
  712. function closeConnecton( $conn ) {
  713. $done = false;
  714. foreach ( $this->mConns as $i1 => $conns2 ) {
  715. foreach ( $conns2 as $i2 => $conns3 ) {
  716. foreach ( $conns3 as $i3 => $candidateConn ) {
  717. if ( $conn === $candidateConn ) {
  718. $conn->close();
  719. unset( $this->mConns[$i1][$i2][$i3] );
  720. $done = true;
  721. break;
  722. }
  723. }
  724. }
  725. }
  726. if ( !$done ) {
  727. $conn->close();
  728. }
  729. }
  730. /**
  731. * Commit transactions on all open connections
  732. */
  733. function commitAll() {
  734. foreach ( $this->mConns as $conns2 ) {
  735. foreach ( $conns2 as $conns3 ) {
  736. foreach ( $conns3 as $conn ) {
  737. $conn->immediateCommit();
  738. }
  739. }
  740. }
  741. }
  742. /* Issue COMMIT only on master, only if queries were done on connection */
  743. function commitMasterChanges() {
  744. // Always 0, but who knows.. :)
  745. $masterIndex = $this->getWriterIndex();
  746. foreach ( $this->mConns as $type => $conns2 ) {
  747. if ( empty( $conns2[$masterIndex] ) ) {
  748. continue;
  749. }
  750. foreach ( $conns2[$masterIndex] as $conn ) {
  751. if ( $conn->doneWrites() ) {
  752. $conn->commit();
  753. }
  754. }
  755. }
  756. }
  757. function waitTimeout( $value = NULL ) {
  758. return wfSetVar( $this->mWaitTimeout, $value );
  759. }
  760. function getLaggedSlaveMode() {
  761. return $this->mLaggedSlaveMode;
  762. }
  763. /* Disables/enables lag checks */
  764. function allowLagged($mode=null) {
  765. if ($mode===null)
  766. return $this->mAllowLagged;
  767. $this->mAllowLagged=$mode;
  768. }
  769. function pingAll() {
  770. $success = true;
  771. foreach ( $this->mConns as $conns2 ) {
  772. foreach ( $conns2 as $conns3 ) {
  773. foreach ( $conns3 as $conn ) {
  774. if ( !$conn->ping() ) {
  775. $success = false;
  776. }
  777. }
  778. }
  779. }
  780. return $success;
  781. }
  782. /**
  783. * Call a function with each open connection object
  784. */
  785. function forEachOpenConnection( $callback, $params = array() ) {
  786. foreach ( $this->mConns as $conns2 ) {
  787. foreach ( $conns2 as $conns3 ) {
  788. foreach ( $conns3 as $conn ) {
  789. $mergedParams = array_merge( array( $conn ), $params );
  790. call_user_func_array( $callback, $mergedParams );
  791. }
  792. }
  793. }
  794. }
  795. /**
  796. * Get the hostname and lag time of the most-lagged slave.
  797. * This is useful for maintenance scripts that need to throttle their updates.
  798. * May attempt to open connections to slaves on the default DB.
  799. */
  800. function getMaxLag() {
  801. $maxLag = -1;
  802. $host = '';
  803. foreach ( $this->mServers as $i => $conn ) {
  804. $conn = $this->getAnyOpenConnection( $i );
  805. if ( !$conn ) {
  806. $conn = $this->openConnection( $i );
  807. }
  808. if ( !$conn ) {
  809. continue;
  810. }
  811. $lag = $conn->getLag();
  812. if ( $lag > $maxLag ) {
  813. $maxLag = $lag;
  814. $host = $this->mServers[$i]['host'];
  815. }
  816. }
  817. return array( $host, $maxLag );
  818. }
  819. /**
  820. * Get lag time for each server
  821. * Results are cached for a short time in memcached, and indefinitely in the process cache
  822. */
  823. function getLagTimes( $wiki = false ) {
  824. # Try process cache
  825. if ( isset( $this->mLagTimes ) ) {
  826. return $this->mLagTimes;
  827. }
  828. # No, send the request to the load monitor
  829. $this->mLagTimes = $this->getLoadMonitor()->getLagTimes( array_keys( $this->mServers ), $wiki );
  830. return $this->mLagTimes;
  831. }
  832. }