queuedaemon.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. #!/usr/bin/env php
  2. <?php
  3. /*
  4. * StatusNet - the distributed open-source microblogging tool
  5. * Copyright (C) 2008, 2009, StatusNet, Inc.
  6. *
  7. * This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU Affero General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU Affero General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Affero General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. */
  20. define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
  21. $shortoptions = 'fi:at:';
  22. $longoptions = array('id=', 'foreground', 'all', 'threads=');
  23. /**
  24. * Attempts to get a count of the processors available on the current system
  25. * to fan out multiple threads.
  26. *
  27. * Recognizes Linux and Mac OS X; others will return default of 1.
  28. *
  29. * @fixme move this to SpawningDaemon, but to get the default val for help
  30. * text we seem to need it before loading infrastructure
  31. * @return intval
  32. */
  33. function getProcessorCount()
  34. {
  35. $cpus = 0;
  36. switch (PHP_OS) {
  37. case 'Linux':
  38. $cpuinfo = file('/proc/cpuinfo');
  39. foreach (file('/proc/cpuinfo') as $line) {
  40. if (preg_match('/^processor\s+:\s+(\d+)\s?$/', $line)) {
  41. $cpus++;
  42. }
  43. }
  44. break;
  45. case 'Darwin':
  46. $cpus = intval(shell_exec("/usr/sbin/sysctl -n hw.ncpu 2>/dev/null"));
  47. break;
  48. }
  49. if ($cpus) {
  50. return $cpus;
  51. }
  52. return 1;
  53. }
  54. $threads = getProcessorCount();
  55. $helptext = <<<END_OF_QUEUE_HELP
  56. Daemon script for running queued items.
  57. -i --id Identity (default none)
  58. -f --foreground Stay in the foreground (default background)
  59. -a --all Handle queues for all local sites
  60. (requires Stomp queue handler, status_network setup)
  61. -t --threads=<n> Spawn <n> processing threads (default $threads)
  62. END_OF_QUEUE_HELP;
  63. require_once INSTALLDIR.'/scripts/commandline.inc';
  64. require_once(INSTALLDIR.'/lib/daemon.php');
  65. require_once(INSTALLDIR.'/classes/Queue_item.php');
  66. require_once(INSTALLDIR.'/classes/Notice.php');
  67. /**
  68. * Queue handling daemon...
  69. *
  70. * The queue daemon by default launches in the background, at which point
  71. * it'll pass control to the configured QueueManager class to poll for updates.
  72. *
  73. * We can then pass individual items through the QueueHandler subclasses
  74. * they belong to.
  75. */
  76. class QueueDaemon extends SpawningDaemon
  77. {
  78. protected $allsites = false;
  79. function __construct($id=null, $daemonize=true, $threads=1, $allsites=false)
  80. {
  81. parent::__construct($id, $daemonize, $threads);
  82. $this->allsites = $allsites;
  83. }
  84. /**
  85. * Setup and start of run loop for this queue handler as a daemon.
  86. * Most of the heavy lifting is passed on to the QueueManager's service()
  87. * method, which passes control on to the QueueHandler's handle()
  88. * method for each item that comes in on the queue.
  89. *
  90. * @return boolean true on success, false on failure
  91. */
  92. function runThread()
  93. {
  94. $this->log(LOG_INFO, 'checking for queued notices');
  95. $master = new QueueMaster($this->get_id(), $this->processManager());
  96. $master->init($this->allsites);
  97. try {
  98. $master->service();
  99. } catch (Exception $e) {
  100. common_log(LOG_ERR, "Unhandled exception: " . $e->getMessage() . ' ' .
  101. str_replace("\n", " ", $e->getTraceAsString()));
  102. return self::EXIT_ERR;
  103. }
  104. $this->log(LOG_INFO, 'finished servicing the queue');
  105. $this->log(LOG_INFO, 'terminating normally');
  106. return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN;
  107. }
  108. }
  109. class QueueMaster extends IoMaster
  110. {
  111. protected $processManager;
  112. function __construct($id, $processManager)
  113. {
  114. parent::__construct($id);
  115. $this->processManager = $processManager;
  116. }
  117. /**
  118. * Initialize IoManagers which are appropriate to this instance.
  119. */
  120. function initManagers()
  121. {
  122. $managers = array();
  123. if (Event::handle('StartQueueDaemonIoManagers', array(&$managers))) {
  124. $qm = QueueManager::get();
  125. $qm->setActiveGroup('main');
  126. $managers[] = $qm;
  127. $managers[] = $this->processManager;
  128. }
  129. Event::handle('EndQueueDaemonIoManagers', array(&$managers));
  130. foreach ($managers as $manager) {
  131. $this->instantiate($manager);
  132. }
  133. }
  134. }
  135. if (have_option('i')) {
  136. $id = get_option_value('i');
  137. } else if (have_option('--id')) {
  138. $id = get_option_value('--id');
  139. } else if (count($args) > 0) {
  140. $id = $args[0];
  141. } else {
  142. $id = null;
  143. }
  144. if (have_option('t')) {
  145. $threads = intval(get_option_value('t'));
  146. } else if (have_option('--threads')) {
  147. $threads = intval(get_option_value('--threads'));
  148. } else {
  149. //If there is no argument for number of threads
  150. //Try reading a config option for the number
  151. $threads = common_config('queue','threads');
  152. }
  153. if (!$threads) {
  154. $threads = getProcessorCount();
  155. }
  156. common_log(LOG_INFO, sprintf('Launching QueueDaemon background process with %1$d threads.', $threads));
  157. $daemonize = !(have_option('f') || have_option('--foreground'));
  158. $all = have_option('a') || have_option('--all');
  159. $daemon = new QueueDaemon($id, $daemonize, $threads, $all);
  160. $daemon->runOnce();