iomaster.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. <?php
  2. // This file is part of GNU social - https://www.gnu.org/software/social
  3. //
  4. // GNU social is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // GNU social is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with GNU social. If not, see <http://www.gnu.org/licenses/>.
  16. /**
  17. * I/O manager to wrap around socket-reading and polling queue & connection managers.
  18. *
  19. * @category QueueManager
  20. * @package GNUsocial
  21. * @author Brion Vibber <brion@status.net>
  22. * @copyright 2009 StatusNet, Inc.
  23. * @license https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
  24. */
  25. defined('GNUSOCIAL') || die();
  26. abstract class IoMaster
  27. {
  28. public $id;
  29. protected $multiSite = false;
  30. protected $managers = array();
  31. protected $singletons = array();
  32. protected $pollTimeouts = array();
  33. protected $lastPoll = array();
  34. public $shutdown = false; // Did we do a graceful shutdown?
  35. public $respawn = true; // Should we respawn after shutdown?
  36. /**
  37. * @param string $id process ID to use in logging/monitoring
  38. */
  39. public function __construct($id)
  40. {
  41. $this->id = $id;
  42. $this->monitor = new QueueMonitor();
  43. }
  44. public function init($multiSite=null)
  45. {
  46. if ($multiSite !== null) {
  47. $this->multiSite = $multiSite;
  48. }
  49. $this->initManagers();
  50. }
  51. /**
  52. * Initialize IoManagers which are appropriate to this instance;
  53. * pass class names or instances into $this->instantiate().
  54. *
  55. * If setup and configuration may vary between sites in multi-site
  56. * mode, it's the subclass's responsibility to set them up here.
  57. *
  58. * Switching site configurations is an acceptable side effect.
  59. */
  60. abstract public function initManagers();
  61. /**
  62. * Instantiate an i/o manager class for the current site.
  63. * If a multi-site capable handler is already present,
  64. * we don't need to build a new one.
  65. *
  66. * @param mixed $manager class name (to run $class::get()) or object
  67. */
  68. protected function instantiate($manager)
  69. {
  70. if (is_string($manager)) {
  71. $manager = call_user_func(array($class, 'get'));
  72. }
  73. $caps = $manager->multiSite();
  74. if ($caps == IoManager::SINGLE_ONLY) {
  75. if ($this->multiSite) {
  76. throw new Exception("$class can't run with --all; aborting.");
  77. }
  78. } elseif ($caps == IoManager::INSTANCE_PER_PROCESS) {
  79. $manager->addSite();
  80. }
  81. if (!in_array($manager, $this->managers, true)) {
  82. // Only need to save singletons once
  83. $this->managers[] = $manager;
  84. }
  85. }
  86. /**
  87. * Basic run loop...
  88. *
  89. * Initialize all io managers, then sit around waiting for input.
  90. * Between events or timeouts, pass control back to idle() method
  91. * to allow for any additional background processing.
  92. */
  93. public function service()
  94. {
  95. $this->logState('init');
  96. $this->start();
  97. $this->checkMemory(false);
  98. while (!$this->shutdown) {
  99. $timeouts = array_values($this->pollTimeouts);
  100. $timeouts[] = 60; // default max timeout
  101. // Wait for something on one of our sockets
  102. $sockets = array();
  103. $managers = array();
  104. foreach ($this->managers as $manager) {
  105. foreach ($manager->getSockets() as $socket) {
  106. $sockets[] = $socket;
  107. $managers[] = $manager;
  108. }
  109. $timeouts[] = intval($manager->timeout());
  110. }
  111. $timeout = min($timeouts);
  112. if ($sockets) {
  113. $read = $sockets;
  114. $write = array();
  115. $except = array();
  116. $this->logState('listening');
  117. //common_debug("Waiting up to $timeout seconds for socket data...");
  118. $ready = stream_select($read, $write, $except, $timeout, 0);
  119. if ($ready === false) {
  120. common_log(LOG_ERR, "Error selecting on sockets");
  121. } elseif ($ready > 0) {
  122. foreach ($read as $socket) {
  123. $index = array_search($socket, $sockets, true);
  124. if ($index !== false) {
  125. $this->logState('queue');
  126. $managers[$index]->handleInput($socket);
  127. } else {
  128. common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
  129. }
  130. }
  131. }
  132. }
  133. if ($timeout > 0 && empty($sockets)) {
  134. // If we had no listeners, sleep until the pollers' next requested wakeup.
  135. common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
  136. $this->logState('sleep');
  137. sleep($timeout);
  138. }
  139. $this->logState('poll');
  140. $this->poll();
  141. $this->logState('idle');
  142. $this->idle();
  143. $this->checkMemory();
  144. }
  145. $this->logState('shutdown');
  146. $this->finish();
  147. }
  148. /**
  149. * Check runtime memory usage, possibly triggering a graceful shutdown
  150. * and thread respawn if we've crossed the soft limit.
  151. *
  152. * @param boolean $respawn if false we'll shut down instead of respawning
  153. */
  154. protected function checkMemory($respawn=true)
  155. {
  156. $memoryLimit = $this->softMemoryLimit();
  157. if ($memoryLimit > 0) {
  158. $usage = memory_get_usage();
  159. if ($usage > $memoryLimit) {
  160. common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
  161. if ($respawn) {
  162. $this->requestRestart();
  163. } else {
  164. $this->requestShutdown();
  165. }
  166. } elseif (common_config('queue', 'debug_memory')) {
  167. $fmt = number_format($usage);
  168. common_log(LOG_DEBUG, "Memory usage $fmt");
  169. }
  170. }
  171. }
  172. /**
  173. * Return fully-parsed soft memory limit in bytes.
  174. * @return intval 0 or -1 if not set
  175. */
  176. public function softMemoryLimit()
  177. {
  178. $softLimit = trim(common_config('queue', 'softlimit'));
  179. if (substr($softLimit, -1) == '%') {
  180. $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
  181. if ($limit > 0) {
  182. return intval(substr($softLimit, 0, -1) * $limit / 100);
  183. } else {
  184. return -1;
  185. }
  186. } else {
  187. return $this->parseMemoryLimit($softLimit);
  188. }
  189. return $softLimit;
  190. }
  191. /**
  192. * Interpret PHP shorthand for memory_limit and friends.
  193. * Why don't they just expose the actual numeric value? :P
  194. * @param string $mem
  195. * @return int
  196. */
  197. public function parseMemoryLimit($mem)
  198. {
  199. // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
  200. $mem = strtolower(trim($mem));
  201. $size = array('k' => 1024,
  202. 'm' => 1024*1024,
  203. 'g' => 1024*1024*1024);
  204. if (empty($mem)) {
  205. return 0;
  206. } elseif (is_numeric($mem)) {
  207. return intval($mem);
  208. } else {
  209. $mult = substr($mem, -1);
  210. if (isset($size[$mult])) {
  211. return substr($mem, 0, -1) * $size[$mult];
  212. } else {
  213. return intval($mem);
  214. }
  215. }
  216. }
  217. public function start()
  218. {
  219. foreach ($this->managers as $index => $manager) {
  220. $manager->start($this);
  221. // @fixme error check
  222. if ($manager->pollInterval()) {
  223. // We'll want to check for input on the first pass
  224. $this->pollTimeouts[$index] = 0;
  225. $this->lastPoll[$index] = 0;
  226. }
  227. }
  228. }
  229. public function finish()
  230. {
  231. foreach ($this->managers as $manager) {
  232. $manager->finish();
  233. // @fixme error check
  234. }
  235. }
  236. /**
  237. * Called during the idle portion of the runloop to see which handlers
  238. */
  239. public function poll()
  240. {
  241. foreach ($this->managers as $index => $manager) {
  242. $interval = $manager->pollInterval();
  243. if ($interval <= 0) {
  244. // Not a polling manager.
  245. continue;
  246. }
  247. if (isset($this->pollTimeouts[$index])) {
  248. $timeout = $this->pollTimeouts[$index];
  249. if (hrtime(true) - $this->lastPoll[$index] < $timeout * 1000000000) {
  250. // Not time to poll yet.
  251. continue;
  252. }
  253. } else {
  254. $timeout = 0;
  255. }
  256. $hit = $manager->poll();
  257. $this->lastPoll[$index] = hrtime(true);
  258. if ($hit) {
  259. // Do the next poll quickly, there may be more input!
  260. $this->pollTimeouts[$index] = 0;
  261. } else {
  262. // Empty queue. Exponential backoff up to the maximum poll interval.
  263. if ($timeout > 0) {
  264. $timeout = min($timeout * 2, $interval);
  265. } else {
  266. $timeout = 1;
  267. }
  268. $this->pollTimeouts[$index] = $timeout;
  269. }
  270. }
  271. }
  272. /**
  273. * Called after each handled item or empty polling cycle.
  274. * This is a good time to e.g. service your XMPP connection.
  275. */
  276. public function idle()
  277. {
  278. foreach ($this->managers as $manager) {
  279. $manager->idle();
  280. }
  281. }
  282. /**
  283. * Send thread state update to the monitoring server, if configured.
  284. *
  285. * @param string $state ('init', 'queue', 'shutdown' etc)
  286. * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
  287. */
  288. protected function logState($state, $substate='')
  289. {
  290. $this->monitor->logState($this->id, $state, $substate);
  291. }
  292. /**
  293. * Send thread stats.
  294. * Thread ID will be implicit; other owners can be listed as well
  295. * for per-queue and per-site records.
  296. *
  297. * @param string $key counter name
  298. * @param array $owners list of owner keys like 'queue:xmpp' or 'site:stat01'
  299. */
  300. public function stats($key, $owners=array())
  301. {
  302. $owners[] = "thread:" . $this->id;
  303. $this->monitor->stats($key, $owners);
  304. }
  305. /**
  306. * For IoManagers to request a graceful shutdown at end of event loop.
  307. */
  308. public function requestShutdown()
  309. {
  310. $this->shutdown = true;
  311. $this->respawn = false;
  312. }
  313. /**
  314. * For IoManagers to request a graceful restart at end of event loop.
  315. */
  316. public function requestRestart()
  317. {
  318. $this->shutdown = true;
  319. $this->respawn = true;
  320. }
  321. }