spawningdaemon.php 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. <?php
  2. /*
  3. * StatusNet - the distributed open-source microblogging tool
  4. * Copyright (C) 2010, StatusNet, Inc.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. */
  19. /**
  20. * Base class for daemon that can launch one or more processing threads,
  21. * respawning them if they exit.
  22. *
  23. * This is mainly intended for indefinite workloads such as monitoring
  24. * a queue or maintaining an IM channel.
  25. *
  26. * Child classes should implement the
  27. *
  28. * We can then pass individual items through the QueueHandler subclasses
  29. * they belong to. We additionally can handle queues for multiple sites.
  30. *
  31. * @package QueueHandler
  32. * @author Brion Vibber <brion@status.net>
  33. */
  34. abstract class SpawningDaemon extends Daemon
  35. {
  36. protected $threads=1;
  37. const EXIT_OK = 0;
  38. const EXIT_ERR = 1;
  39. const EXIT_SHUTDOWN = 100;
  40. const EXIT_RESTART = 101;
  41. function __construct($id=null, $daemonize=true, $threads=1)
  42. {
  43. parent::__construct($daemonize);
  44. if ($id) {
  45. $this->set_id($id);
  46. }
  47. $this->threads = $threads;
  48. }
  49. /**
  50. * Perform some actual work!
  51. *
  52. * @return int exit code; use self::EXIT_SHUTDOWN to request not to respawn.
  53. */
  54. public abstract function runThread();
  55. /**
  56. * Spawn one or more background processes and let them start running.
  57. * Each individual process will execute whatever's in the runThread()
  58. * method, which should be overridden.
  59. *
  60. * Child processes will be automatically respawned when they exit.
  61. *
  62. * @todo possibly allow for not respawning on "normal" exits...
  63. * though ParallelizingDaemon is probably better for workloads
  64. * that have forseeable endpoints.
  65. */
  66. function run()
  67. {
  68. $this->initPipes();
  69. $children = array();
  70. for ($i = 1; $i <= $this->threads; $i++) {
  71. $pid = pcntl_fork();
  72. if ($pid < 0) {
  73. $this->log(LOG_ERR, "Couldn't fork for thread $i; aborting\n");
  74. exit(1);
  75. } else if ($pid == 0) {
  76. $this->initAndRunChild($i);
  77. } else {
  78. $this->log(LOG_INFO, "Spawned thread $i as pid $pid");
  79. $children[$i] = $pid;
  80. }
  81. sleep(common_config('queue', 'spawndelay'));
  82. }
  83. $this->log(LOG_INFO, "Waiting for children to complete.");
  84. while (count($children) > 0) {
  85. $status = null;
  86. $pid = pcntl_wait($status);
  87. if ($pid > 0) {
  88. $i = array_search($pid, $children);
  89. if ($i === false) {
  90. $this->log(LOG_ERR, "Ignoring exit of unrecognized child pid $pid");
  91. continue;
  92. }
  93. if (pcntl_wifexited($status)) {
  94. $exitCode = pcntl_wexitstatus($status);
  95. $info = "status $exitCode";
  96. } else if (pcntl_wifsignaled($status)) {
  97. $exitCode = self::EXIT_ERR;
  98. $signal = pcntl_wtermsig($status);
  99. $info = "signal $signal";
  100. }
  101. unset($children[$i]);
  102. if ($this->shouldRespawn($exitCode)) {
  103. $this->log(LOG_INFO, "Thread $i pid $pid exited with $info; respawing.");
  104. $pid = pcntl_fork();
  105. if ($pid < 0) {
  106. $this->log(LOG_ERR, "Couldn't fork to respawn thread $i; aborting thread.\n");
  107. } else if ($pid == 0) {
  108. $this->initAndRunChild($i);
  109. } else {
  110. $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
  111. $children[$i] = $pid;
  112. }
  113. sleep(common_config('queue', 'spawndelay'));
  114. } else {
  115. $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; closing out thread.");
  116. }
  117. }
  118. }
  119. $this->log(LOG_INFO, "All child processes complete.");
  120. return true;
  121. }
  122. /**
  123. * Create an IPC socket pair which child processes can use to detect
  124. * if the parent process has been killed.
  125. */
  126. function initPipes()
  127. {
  128. $sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
  129. if ($sockets) {
  130. $this->parentWriter = $sockets[0];
  131. $this->parentReader = $sockets[1];
  132. } else {
  133. $this->log(LOG_ERR, "Couldn't create inter-process sockets");
  134. exit(1);
  135. }
  136. }
  137. /**
  138. * Build an IOManager that simply ensures that we have a connection
  139. * to the parent process open. If it breaks, the child process will
  140. * die.
  141. *
  142. * @return ProcessManager
  143. */
  144. public function processManager()
  145. {
  146. return new ProcessManager($this->parentReader);
  147. }
  148. /**
  149. * Determine whether to respawn an exited subprocess based on its exit code.
  150. * Otherwise we'll respawn all exits by default.
  151. *
  152. * @param int $exitCode
  153. * @return boolean true to respawn
  154. */
  155. protected function shouldRespawn($exitCode)
  156. {
  157. if ($exitCode == self::EXIT_SHUTDOWN) {
  158. // Thread requested a clean shutdown.
  159. return false;
  160. } else {
  161. // Otherwise we should always respawn!
  162. return true;
  163. }
  164. }
  165. /**
  166. * Initialize things for a fresh thread, call runThread(), and
  167. * exit at completion with appropriate return value.
  168. */
  169. protected function initAndRunChild($thread)
  170. {
  171. // Close the writer end of our parent<->children pipe.
  172. fclose($this->parentWriter);
  173. $this->set_id($this->get_id() . "." . $thread);
  174. $this->resetDb();
  175. $exitCode = $this->runThread();
  176. exit($exitCode);
  177. }
  178. /**
  179. * Reconnect to the database for each child process,
  180. * or they'll get very confused trying to use the
  181. * same socket.
  182. */
  183. protected function resetDb()
  184. {
  185. // @fixme do we need to explicitly open the db too
  186. // or is this implied?
  187. global $_DB_DATAOBJECT;
  188. unset($_DB_DATAOBJECT['CONNECTIONS']);
  189. // Reconnect main memcached, or threads will stomp on
  190. // each other and corrupt their requests.
  191. $cache = Cache::instance();
  192. if ($cache) {
  193. $cache->reconnect();
  194. }
  195. // Also reconnect memcached for status_network table.
  196. if (!empty(Status_network::$cache)) {
  197. Status_network::$cache->close();
  198. Status_network::$cache = null;
  199. }
  200. }
  201. function log($level, $msg)
  202. {
  203. common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg);
  204. }
  205. function name()
  206. {
  207. return strtolower(get_class($this).'.'.$this->get_id());
  208. }
  209. }