queuedaemon.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. #!/usr/bin/env php
  2. <?php
  3. /*
  4. * StatusNet - the distributed open-source microblogging tool
  5. * Copyright (C) 2008, 2009, StatusNet, Inc.
  6. *
  7. * This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU Affero General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU Affero General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Affero General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. */
  20. define('INSTALLDIR', dirname(__DIR__));
  21. define('PUBLICDIR', INSTALLDIR . DIRECTORY_SEPARATOR . 'public');
  22. $shortoptions = 'fi:at:';
  23. $longoptions = array('id=', 'foreground', 'all', 'threads=');
  24. /**
  25. * Attempts to get a count of the processors available on the current system
  26. * to fan out multiple threads.
  27. *
  28. * Recognizes Linux and Mac OS X; others will return default of 1.
  29. *
  30. * @fixme move this to SpawningDaemon, but to get the default val for help
  31. * text we seem to need it before loading infrastructure
  32. * @return intval
  33. */
  34. function getProcessorCount()
  35. {
  36. $cpus = 0;
  37. switch (PHP_OS) {
  38. case 'Linux':
  39. $cpuinfo = file('/proc/cpuinfo');
  40. foreach (file('/proc/cpuinfo') as $line) {
  41. if (preg_match('/^processor\s+:\s+(\d+)\s?$/', $line)) {
  42. $cpus++;
  43. }
  44. }
  45. break;
  46. case 'Darwin':
  47. $cpus = intval(shell_exec("/usr/sbin/sysctl -n hw.ncpu 2>/dev/null"));
  48. break;
  49. }
  50. if ($cpus) {
  51. return $cpus;
  52. }
  53. return 1;
  54. }
  55. $threads = getProcessorCount();
  56. $helptext = <<<END_OF_QUEUE_HELP
  57. Daemon script for running queued items.
  58. -i --id Identity (default none)
  59. -f --foreground Stay in the foreground (default background)
  60. -a --all Handle queues for all local sites
  61. (requires Stomp queue handler, status_network setup)
  62. -t --threads=<n> Spawn <n> processing threads (default $threads)
  63. END_OF_QUEUE_HELP;
  64. require_once INSTALLDIR.'/scripts/commandline.inc';
  65. require_once(INSTALLDIR.'/lib/daemon.php');
  66. require_once(INSTALLDIR.'/classes/Queue_item.php');
  67. require_once(INSTALLDIR.'/classes/Notice.php');
  68. /**
  69. * Queue handling daemon...
  70. *
  71. * The queue daemon by default launches in the background, at which point
  72. * it'll pass control to the configured QueueManager class to poll for updates.
  73. *
  74. * We can then pass individual items through the QueueHandler subclasses
  75. * they belong to.
  76. */
  77. class QueueDaemon extends SpawningDaemon
  78. {
  79. protected $allsites = false;
  80. function __construct($id=null, $daemonize=true, $threads=1, $allsites=false)
  81. {
  82. parent::__construct($id, $daemonize, $threads);
  83. $this->allsites = $allsites;
  84. }
  85. /**
  86. * Setup and start of run loop for this queue handler as a daemon.
  87. * Most of the heavy lifting is passed on to the QueueManager's service()
  88. * method, which passes control on to the QueueHandler's handle()
  89. * method for each item that comes in on the queue.
  90. *
  91. * @return boolean true on success, false on failure
  92. */
  93. function runThread()
  94. {
  95. $this->log(LOG_INFO, 'checking for queued notices');
  96. $master = new QueueMaster($this->get_id(), $this->processManager());
  97. $master->init($this->allsites);
  98. try {
  99. $master->service();
  100. } catch (Exception $e) {
  101. common_log(LOG_ERR, "Unhandled exception: " . $e->getMessage() . ' ' .
  102. str_replace("\n", " ", $e->getTraceAsString()));
  103. return self::EXIT_ERR;
  104. }
  105. $this->log(LOG_INFO, 'finished servicing the queue');
  106. $this->log(LOG_INFO, 'terminating normally');
  107. return $master->respawn ? self::EXIT_RESTART : self::EXIT_SHUTDOWN;
  108. }
  109. }
  110. class QueueMaster extends IoMaster
  111. {
  112. protected $processManager;
  113. function __construct($id, $processManager)
  114. {
  115. parent::__construct($id);
  116. $this->processManager = $processManager;
  117. }
  118. /**
  119. * Initialize IoManagers which are appropriate to this instance.
  120. */
  121. function initManagers()
  122. {
  123. $managers = array();
  124. if (Event::handle('StartQueueDaemonIoManagers', array(&$managers))) {
  125. $qm = QueueManager::get();
  126. $qm->setActiveGroup('main');
  127. $managers[] = $qm;
  128. $managers[] = $this->processManager;
  129. }
  130. Event::handle('EndQueueDaemonIoManagers', array(&$managers));
  131. foreach ($managers as $manager) {
  132. $this->instantiate($manager);
  133. }
  134. }
  135. }
  136. if (have_option('i')) {
  137. $id = get_option_value('i');
  138. } else if (have_option('--id')) {
  139. $id = get_option_value('--id');
  140. } else if (count($args) > 0) {
  141. $id = $args[0];
  142. } else {
  143. $id = null;
  144. }
  145. if (have_option('t')) {
  146. $threads = intval(get_option_value('t'));
  147. } else if (have_option('--threads')) {
  148. $threads = intval(get_option_value('--threads'));
  149. } else {
  150. //If there is no argument for number of threads
  151. //Try reading a config option for the number
  152. $threads = common_config('queue','threads');
  153. }
  154. if (!$threads) {
  155. $threads = getProcessorCount();
  156. }
  157. common_log(LOG_INFO, sprintf('Launching QueueDaemon background process with %1$d threads.', $threads));
  158. $daemonize = !(have_option('f') || have_option('--foreground'));
  159. $all = have_option('a') || have_option('--all');
  160. $daemon = new QueueDaemon($id, $daemonize, $threads, $all);
  161. $daemon->runOnce();