iomaster.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. <?php
  2. /**
  3. * StatusNet, the distributed open-source microblogging tool
  4. *
  5. * I/O manager to wrap around socket-reading and polling queue & connection managers.
  6. *
  7. * PHP version 5
  8. *
  9. * LICENCE: This program is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU Affero General Public License as published by
  11. * the Free Software Foundation, either version 3 of the License, or
  12. * (at your option) any later version.
  13. *
  14. * This program is distributed in the hope that it will be useful,
  15. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. * GNU Affero General Public License for more details.
  18. *
  19. * You should have received a copy of the GNU Affero General Public License
  20. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. *
  22. * @category QueueManager
  23. * @package StatusNet
  24. * @author Brion Vibber <brion@status.net>
  25. * @copyright 2009 StatusNet, Inc.
  26. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
  27. * @link http://status.net/
  28. */
  29. abstract class IoMaster
  30. {
  31. public $id;
  32. protected $multiSite = false;
  33. protected $managers = array();
  34. protected $singletons = array();
  35. protected $pollTimeouts = array();
  36. protected $lastPoll = array();
  37. public $shutdown = false; // Did we do a graceful shutdown?
  38. public $respawn = true; // Should we respawn after shutdown?
  39. /**
  40. * @param string $id process ID to use in logging/monitoring
  41. */
  42. public function __construct($id)
  43. {
  44. $this->id = $id;
  45. $this->monitor = new QueueMonitor();
  46. }
  47. public function init($multiSite=null)
  48. {
  49. if ($multiSite !== null) {
  50. $this->multiSite = $multiSite;
  51. }
  52. $this->initManagers();
  53. }
  54. /**
  55. * Initialize IoManagers which are appropriate to this instance;
  56. * pass class names or instances into $this->instantiate().
  57. *
  58. * If setup and configuration may vary between sites in multi-site
  59. * mode, it's the subclass's responsibility to set them up here.
  60. *
  61. * Switching site configurations is an acceptable side effect.
  62. */
  63. abstract function initManagers();
  64. /**
  65. * Instantiate an i/o manager class for the current site.
  66. * If a multi-site capable handler is already present,
  67. * we don't need to build a new one.
  68. *
  69. * @param mixed $manager class name (to run $class::get()) or object
  70. */
  71. protected function instantiate($manager)
  72. {
  73. if (is_string($manager)) {
  74. $manager = call_user_func(array($class, 'get'));
  75. }
  76. $caps = $manager->multiSite();
  77. if ($caps == IoManager::SINGLE_ONLY) {
  78. if ($this->multiSite) {
  79. throw new Exception("$class can't run with --all; aborting.");
  80. }
  81. } else if ($caps == IoManager::INSTANCE_PER_PROCESS) {
  82. $manager->addSite();
  83. }
  84. if (!in_array($manager, $this->managers, true)) {
  85. // Only need to save singletons once
  86. $this->managers[] = $manager;
  87. }
  88. }
  89. /**
  90. * Basic run loop...
  91. *
  92. * Initialize all io managers, then sit around waiting for input.
  93. * Between events or timeouts, pass control back to idle() method
  94. * to allow for any additional background processing.
  95. */
  96. function service()
  97. {
  98. $this->logState('init');
  99. $this->start();
  100. $this->checkMemory(false);
  101. while (!$this->shutdown) {
  102. $timeouts = array_values($this->pollTimeouts);
  103. $timeouts[] = 60; // default max timeout
  104. // Wait for something on one of our sockets
  105. $sockets = array();
  106. $managers = array();
  107. foreach ($this->managers as $manager) {
  108. foreach ($manager->getSockets() as $socket) {
  109. $sockets[] = $socket;
  110. $managers[] = $manager;
  111. }
  112. $timeouts[] = intval($manager->timeout());
  113. }
  114. $timeout = min($timeouts);
  115. if ($sockets) {
  116. $read = $sockets;
  117. $write = array();
  118. $except = array();
  119. $this->logState('listening');
  120. //common_debug("Waiting up to $timeout seconds for socket data...");
  121. $ready = stream_select($read, $write, $except, $timeout, 0);
  122. if ($ready === false) {
  123. common_log(LOG_ERR, "Error selecting on sockets");
  124. } else if ($ready > 0) {
  125. foreach ($read as $socket) {
  126. $index = array_search($socket, $sockets, true);
  127. if ($index !== false) {
  128. $this->logState('queue');
  129. $managers[$index]->handleInput($socket);
  130. } else {
  131. common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
  132. }
  133. }
  134. }
  135. }
  136. if ($timeout > 0 && empty($sockets)) {
  137. // If we had no listeners, sleep until the pollers' next requested wakeup.
  138. common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
  139. $this->logState('sleep');
  140. sleep($timeout);
  141. }
  142. $this->logState('poll');
  143. $this->poll();
  144. $this->logState('idle');
  145. $this->idle();
  146. $this->checkMemory();
  147. }
  148. $this->logState('shutdown');
  149. $this->finish();
  150. }
  151. /**
  152. * Check runtime memory usage, possibly triggering a graceful shutdown
  153. * and thread respawn if we've crossed the soft limit.
  154. *
  155. * @param boolean $respawn if false we'll shut down instead of respawning
  156. */
  157. protected function checkMemory($respawn=true)
  158. {
  159. $memoryLimit = $this->softMemoryLimit();
  160. if ($memoryLimit > 0) {
  161. $usage = memory_get_usage();
  162. if ($usage > $memoryLimit) {
  163. common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
  164. if ($respawn) {
  165. $this->requestRestart();
  166. } else {
  167. $this->requestShutdown();
  168. }
  169. } else if (common_config('queue', 'debug_memory')) {
  170. $fmt = number_format($usage);
  171. common_log(LOG_DEBUG, "Memory usage $fmt");
  172. }
  173. }
  174. }
  175. /**
  176. * Return fully-parsed soft memory limit in bytes.
  177. * @return intval 0 or -1 if not set
  178. */
  179. function softMemoryLimit()
  180. {
  181. $softLimit = trim(common_config('queue', 'softlimit'));
  182. if (substr($softLimit, -1) == '%') {
  183. $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
  184. if ($limit > 0) {
  185. return intval(substr($softLimit, 0, -1) * $limit / 100);
  186. } else {
  187. return -1;
  188. }
  189. } else {
  190. return $this->parseMemoryLimit($softLimit);
  191. }
  192. return $softLimit;
  193. }
  194. /**
  195. * Interpret PHP shorthand for memory_limit and friends.
  196. * Why don't they just expose the actual numeric value? :P
  197. * @param string $mem
  198. * @return int
  199. */
  200. public function parseMemoryLimit($mem)
  201. {
  202. // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
  203. $mem = strtolower(trim($mem));
  204. $size = array('k' => 1024,
  205. 'm' => 1024*1024,
  206. 'g' => 1024*1024*1024);
  207. if (empty($mem)) {
  208. return 0;
  209. } else if (is_numeric($mem)) {
  210. return intval($mem);
  211. } else {
  212. $mult = substr($mem, -1);
  213. if (isset($size[$mult])) {
  214. return substr($mem, 0, -1) * $size[$mult];
  215. } else {
  216. return intval($mem);
  217. }
  218. }
  219. }
  220. function start()
  221. {
  222. foreach ($this->managers as $index => $manager) {
  223. $manager->start($this);
  224. // @fixme error check
  225. if ($manager->pollInterval()) {
  226. // We'll want to check for input on the first pass
  227. $this->pollTimeouts[$index] = 0;
  228. $this->lastPoll[$index] = 0;
  229. }
  230. }
  231. }
  232. function finish()
  233. {
  234. foreach ($this->managers as $manager) {
  235. $manager->finish();
  236. // @fixme error check
  237. }
  238. }
  239. /**
  240. * Called during the idle portion of the runloop to see which handlers
  241. */
  242. function poll()
  243. {
  244. foreach ($this->managers as $index => $manager) {
  245. $interval = $manager->pollInterval();
  246. if ($interval <= 0) {
  247. // Not a polling manager.
  248. continue;
  249. }
  250. if (isset($this->pollTimeouts[$index])) {
  251. $timeout = $this->pollTimeouts[$index];
  252. if (time() - $this->lastPoll[$index] < $timeout) {
  253. // Not time to poll yet.
  254. continue;
  255. }
  256. } else {
  257. $timeout = 0;
  258. }
  259. $hit = $manager->poll();
  260. $this->lastPoll[$index] = time();
  261. if ($hit) {
  262. // Do the next poll quickly, there may be more input!
  263. $this->pollTimeouts[$index] = 0;
  264. } else {
  265. // Empty queue. Exponential backoff up to the maximum poll interval.
  266. if ($timeout > 0) {
  267. $timeout = min($timeout * 2, $interval);
  268. } else {
  269. $timeout = 1;
  270. }
  271. $this->pollTimeouts[$index] = $timeout;
  272. }
  273. }
  274. }
  275. /**
  276. * Called after each handled item or empty polling cycle.
  277. * This is a good time to e.g. service your XMPP connection.
  278. */
  279. function idle()
  280. {
  281. foreach ($this->managers as $manager) {
  282. $manager->idle();
  283. }
  284. }
  285. /**
  286. * Send thread state update to the monitoring server, if configured.
  287. *
  288. * @param string $state ('init', 'queue', 'shutdown' etc)
  289. * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
  290. */
  291. protected function logState($state, $substate='')
  292. {
  293. $this->monitor->logState($this->id, $state, $substate);
  294. }
  295. /**
  296. * Send thread stats.
  297. * Thread ID will be implicit; other owners can be listed as well
  298. * for per-queue and per-site records.
  299. *
  300. * @param string $key counter name
  301. * @param array $owners list of owner keys like 'queue:xmpp' or 'site:stat01'
  302. */
  303. public function stats($key, $owners=array())
  304. {
  305. $owners[] = "thread:" . $this->id;
  306. $this->monitor->stats($key, $owners);
  307. }
  308. /**
  309. * For IoManagers to request a graceful shutdown at end of event loop.
  310. */
  311. public function requestShutdown()
  312. {
  313. $this->shutdown = true;
  314. $this->respawn = false;
  315. }
  316. /**
  317. * For IoManagers to request a graceful restart at end of event loop.
  318. */
  319. public function requestRestart()
  320. {
  321. $this->shutdown = true;
  322. $this->respawn = true;
  323. }
  324. }