123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- <?php
- /**
- * StatusNet, the distributed open-source microblogging tool
- *
- * I/O manager to wrap around socket-reading and polling queue & connection managers.
- *
- * PHP version 5
- *
- * LICENCE: This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * @category QueueManager
- * @package StatusNet
- * @author Brion Vibber <brion@status.net>
- * @copyright 2009 StatusNet, Inc.
- * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
- * @link http://status.net/
- */
- abstract class IoMaster
- {
- public $id;
- protected $multiSite = false;
- protected $managers = array();
- protected $singletons = array();
- protected $pollTimeouts = array();
- protected $lastPoll = array();
- public $shutdown = false; // Did we do a graceful shutdown?
- public $respawn = true; // Should we respawn after shutdown?
- /**
- * @param string $id process ID to use in logging/monitoring
- */
- public function __construct($id)
- {
- $this->id = $id;
- $this->monitor = new QueueMonitor();
- }
- public function init($multiSite=null)
- {
- if ($multiSite !== null) {
- $this->multiSite = $multiSite;
- }
- $this->initManagers();
- }
- /**
- * Initialize IoManagers which are appropriate to this instance;
- * pass class names or instances into $this->instantiate().
- *
- * If setup and configuration may vary between sites in multi-site
- * mode, it's the subclass's responsibility to set them up here.
- *
- * Switching site configurations is an acceptable side effect.
- */
- abstract function initManagers();
- /**
- * Instantiate an i/o manager class for the current site.
- * If a multi-site capable handler is already present,
- * we don't need to build a new one.
- *
- * @param mixed $manager class name (to run $class::get()) or object
- */
- protected function instantiate($manager)
- {
- if (is_string($manager)) {
- $manager = call_user_func(array($class, 'get'));
- }
- $caps = $manager->multiSite();
- if ($caps == IoManager::SINGLE_ONLY) {
- if ($this->multiSite) {
- throw new Exception("$class can't run with --all; aborting.");
- }
- } else if ($caps == IoManager::INSTANCE_PER_PROCESS) {
- $manager->addSite();
- }
- if (!in_array($manager, $this->managers, true)) {
- // Only need to save singletons once
- $this->managers[] = $manager;
- }
- }
- /**
- * Basic run loop...
- *
- * Initialize all io managers, then sit around waiting for input.
- * Between events or timeouts, pass control back to idle() method
- * to allow for any additional background processing.
- */
- function service()
- {
- $this->logState('init');
- $this->start();
- $this->checkMemory(false);
- while (!$this->shutdown) {
- $timeouts = array_values($this->pollTimeouts);
- $timeouts[] = 60; // default max timeout
- // Wait for something on one of our sockets
- $sockets = array();
- $managers = array();
- foreach ($this->managers as $manager) {
- foreach ($manager->getSockets() as $socket) {
- $sockets[] = $socket;
- $managers[] = $manager;
- }
- $timeouts[] = intval($manager->timeout());
- }
- $timeout = min($timeouts);
- if ($sockets) {
- $read = $sockets;
- $write = array();
- $except = array();
- $this->logState('listening');
- //common_debug("Waiting up to $timeout seconds for socket data...");
- $ready = stream_select($read, $write, $except, $timeout, 0);
- if ($ready === false) {
- common_log(LOG_ERR, "Error selecting on sockets");
- } else if ($ready > 0) {
- foreach ($read as $socket) {
- $index = array_search($socket, $sockets, true);
- if ($index !== false) {
- $this->logState('queue');
- $managers[$index]->handleInput($socket);
- } else {
- common_log(LOG_ERR, "Saw input on a socket we didn't listen to");
- }
- }
- }
- }
- if ($timeout > 0 && empty($sockets)) {
- // If we had no listeners, sleep until the pollers' next requested wakeup.
- common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
- $this->logState('sleep');
- sleep($timeout);
- }
- $this->logState('poll');
- $this->poll();
- $this->logState('idle');
- $this->idle();
- $this->checkMemory();
- }
- $this->logState('shutdown');
- $this->finish();
- }
- /**
- * Check runtime memory usage, possibly triggering a graceful shutdown
- * and thread respawn if we've crossed the soft limit.
- *
- * @param boolean $respawn if false we'll shut down instead of respawning
- */
- protected function checkMemory($respawn=true)
- {
- $memoryLimit = $this->softMemoryLimit();
- if ($memoryLimit > 0) {
- $usage = memory_get_usage();
- if ($usage > $memoryLimit) {
- common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
- if ($respawn) {
- $this->requestRestart();
- } else {
- $this->requestShutdown();
- }
- } else if (common_config('queue', 'debug_memory')) {
- $fmt = number_format($usage);
- common_log(LOG_DEBUG, "Memory usage $fmt");
- }
- }
- }
- /**
- * Return fully-parsed soft memory limit in bytes.
- * @return intval 0 or -1 if not set
- */
- function softMemoryLimit()
- {
- $softLimit = trim(common_config('queue', 'softlimit'));
- if (substr($softLimit, -1) == '%') {
- $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
- if ($limit > 0) {
- return intval(substr($softLimit, 0, -1) * $limit / 100);
- } else {
- return -1;
- }
- } else {
- return $this->parseMemoryLimit($softLimit);
- }
- return $softLimit;
- }
- /**
- * Interpret PHP shorthand for memory_limit and friends.
- * Why don't they just expose the actual numeric value? :P
- * @param string $mem
- * @return int
- */
- public function parseMemoryLimit($mem)
- {
- // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
- $mem = strtolower(trim($mem));
- $size = array('k' => 1024,
- 'm' => 1024*1024,
- 'g' => 1024*1024*1024);
- if (empty($mem)) {
- return 0;
- } else if (is_numeric($mem)) {
- return intval($mem);
- } else {
- $mult = substr($mem, -1);
- if (isset($size[$mult])) {
- return substr($mem, 0, -1) * $size[$mult];
- } else {
- return intval($mem);
- }
- }
- }
- function start()
- {
- foreach ($this->managers as $index => $manager) {
- $manager->start($this);
- // @fixme error check
- if ($manager->pollInterval()) {
- // We'll want to check for input on the first pass
- $this->pollTimeouts[$index] = 0;
- $this->lastPoll[$index] = 0;
- }
- }
- }
- function finish()
- {
- foreach ($this->managers as $manager) {
- $manager->finish();
- // @fixme error check
- }
- }
- /**
- * Called during the idle portion of the runloop to see which handlers
- */
- function poll()
- {
- foreach ($this->managers as $index => $manager) {
- $interval = $manager->pollInterval();
- if ($interval <= 0) {
- // Not a polling manager.
- continue;
- }
- if (isset($this->pollTimeouts[$index])) {
- $timeout = $this->pollTimeouts[$index];
- if (time() - $this->lastPoll[$index] < $timeout) {
- // Not time to poll yet.
- continue;
- }
- } else {
- $timeout = 0;
- }
- $hit = $manager->poll();
- $this->lastPoll[$index] = time();
- if ($hit) {
- // Do the next poll quickly, there may be more input!
- $this->pollTimeouts[$index] = 0;
- } else {
- // Empty queue. Exponential backoff up to the maximum poll interval.
- if ($timeout > 0) {
- $timeout = min($timeout * 2, $interval);
- } else {
- $timeout = 1;
- }
- $this->pollTimeouts[$index] = $timeout;
- }
- }
- }
- /**
- * Called after each handled item or empty polling cycle.
- * This is a good time to e.g. service your XMPP connection.
- */
- function idle()
- {
- foreach ($this->managers as $manager) {
- $manager->idle();
- }
- }
- /**
- * Send thread state update to the monitoring server, if configured.
- *
- * @param string $state ('init', 'queue', 'shutdown' etc)
- * @param string $substate (optional, eg queue name 'omb' 'sms' etc)
- */
- protected function logState($state, $substate='')
- {
- $this->monitor->logState($this->id, $state, $substate);
- }
- /**
- * Send thread stats.
- * Thread ID will be implicit; other owners can be listed as well
- * for per-queue and per-site records.
- *
- * @param string $key counter name
- * @param array $owners list of owner keys like 'queue:xmpp' or 'site:stat01'
- */
- public function stats($key, $owners=array())
- {
- $owners[] = "thread:" . $this->id;
- $this->monitor->stats($key, $owners);
- }
- /**
- * For IoManagers to request a graceful shutdown at end of event loop.
- */
- public function requestShutdown()
- {
- $this->shutdown = true;
- $this->respawn = false;
- }
- /**
- * For IoManagers to request a graceful restart at end of event loop.
- */
- public function requestRestart()
- {
- $this->shutdown = true;
- $this->respawn = true;
- }
- }
|