spawningdaemon.php 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. <?php
  2. // This file is part of GNU social - https://www.gnu.org/software/social
  3. //
  4. // GNU social is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // GNU social is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with GNU social. If not, see <http://www.gnu.org/licenses/>.
  16. defined('GNUSOCIAL') || die();
  17. /**
  18. * Base class for daemon that can launch one or more processing threads,
  19. * respawning them if they exit.
  20. *
  21. * This is mainly intended for indefinite workloads such as monitoring
  22. * a queue or maintaining an IM channel.
  23. *
  24. * Child classes should implement the
  25. *
  26. * We can then pass individual items through the QueueHandler subclasses
  27. * they belong to. We additionally can handle queues for multiple sites.
  28. *
  29. * @package QueueHandler
  30. * @author Brion Vibber <brion@status.net>
  31. */
  32. abstract class SpawningDaemon extends Daemon
  33. {
  34. protected $threads = 1;
  35. const EXIT_OK = 0;
  36. const EXIT_ERR = 1;
  37. const EXIT_SHUTDOWN = 100;
  38. const EXIT_RESTART = 101;
  39. public function __construct($id = null, $daemonize = true, $threads = 1)
  40. {
  41. parent::__construct($daemonize);
  42. if ($id) {
  43. $this->set_id($id);
  44. }
  45. $this->threads = $threads;
  46. }
  47. /**
  48. * Perform some actual work!
  49. *
  50. * @return int exit code; use self::EXIT_SHUTDOWN to request not to respawn.
  51. */
  52. abstract public function runThread();
  53. /**
  54. * Spawn one or more background processes and let them start running.
  55. * Each individual process will execute whatever's in the runThread()
  56. * method, which should be overridden.
  57. *
  58. * Child processes will be automatically respawned when they exit.
  59. *
  60. * @todo possibly allow for not respawning on "normal" exits...
  61. * though ParallelizingDaemon is probably better for workloads
  62. * that have forseeable endpoints.
  63. */
  64. public function run()
  65. {
  66. $this->initPipes();
  67. $children = [];
  68. for ($i = 1; $i <= $this->threads; $i++) {
  69. $pid = pcntl_fork();
  70. if ($pid < 0) {
  71. $this->log(LOG_ERR, "Couldn't fork for thread $i; aborting\n");
  72. exit(1);
  73. } elseif ($pid === 0) {
  74. $this->initAndRunChild($i);
  75. } else {
  76. $this->log(LOG_INFO, "Spawned thread $i as pid $pid");
  77. $children[$i] = $pid;
  78. }
  79. sleep(common_config('queue', 'spawndelay'));
  80. }
  81. $this->log(LOG_INFO, "Waiting for children to complete.");
  82. while (count($children) > 0) {
  83. $status = null;
  84. $pid = pcntl_wait($status);
  85. if ($pid > 0) {
  86. $i = array_search($pid, $children);
  87. if ($i === false) {
  88. $this->log(LOG_ERR, "Ignoring exit of unrecognized child pid $pid");
  89. continue;
  90. }
  91. if (pcntl_wifexited($status)) {
  92. $exitCode = pcntl_wexitstatus($status);
  93. $info = "status $exitCode";
  94. } elseif (pcntl_wifsignaled($status)) {
  95. $exitCode = self::EXIT_ERR;
  96. $signal = pcntl_wtermsig($status);
  97. $info = "signal $signal";
  98. }
  99. unset($children[$i]);
  100. if ($this->shouldRespawn($exitCode)) {
  101. $this->log(LOG_INFO, "Thread $i pid $pid exited with $info; respawing.");
  102. $pid = pcntl_fork();
  103. if ($pid < 0) {
  104. $this->log(LOG_ERR, "Couldn't fork to respawn thread $i; aborting thread.\n");
  105. } elseif ($pid === 0) {
  106. $this->initAndRunChild($i);
  107. } else {
  108. $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
  109. $children[$i] = $pid;
  110. }
  111. sleep(common_config('queue', 'spawndelay'));
  112. } else {
  113. $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; closing out thread.");
  114. }
  115. }
  116. }
  117. $this->log(LOG_INFO, "All child processes complete.");
  118. return true;
  119. }
  120. /**
  121. * Create an IPC socket pair which child processes can use to detect
  122. * if the parent process has been killed.
  123. */
  124. public function initPipes()
  125. {
  126. $sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
  127. if ($sockets) {
  128. $this->parentWriter = $sockets[0];
  129. $this->parentReader = $sockets[1];
  130. } else {
  131. $this->log(LOG_ERR, "Couldn't create inter-process sockets");
  132. exit(1);
  133. }
  134. }
  135. /**
  136. * Build an IOManager that simply ensures that we have a connection
  137. * to the parent process open. If it breaks, the child process will
  138. * die.
  139. *
  140. * @return ProcessManager
  141. */
  142. public function processManager()
  143. {
  144. return new ProcessManager($this->parentReader);
  145. }
  146. /**
  147. * Determine whether to respawn an exited subprocess based on its exit code.
  148. * Otherwise we'll respawn all exits by default.
  149. *
  150. * @param int $exitCode
  151. * @return boolean true to respawn
  152. */
  153. protected function shouldRespawn($exitCode)
  154. {
  155. if ($exitCode == self::EXIT_SHUTDOWN) {
  156. // Thread requested a clean shutdown.
  157. return false;
  158. } else {
  159. // Otherwise we should always respawn!
  160. return true;
  161. }
  162. }
  163. /**
  164. * Initialize things for a fresh thread, call runThread(), and
  165. * exit at completion with appropriate return value.
  166. */
  167. protected function initAndRunChild($thread)
  168. {
  169. // Close the writer end of our parent<->children pipe.
  170. fclose($this->parentWriter);
  171. $this->set_id($this->get_id() . "." . $thread);
  172. $this->resetDb();
  173. $exitCode = $this->runThread();
  174. exit($exitCode);
  175. }
  176. /**
  177. * Reconnect to the database for each child process,
  178. * or they'll get very confused trying to use the
  179. * same socket.
  180. */
  181. protected function resetDb()
  182. {
  183. global $_DB_DATAOBJECT;
  184. // Can't be called statically
  185. $user = new User();
  186. $conn = $user->getDatabaseConnection();
  187. $conn->disconnect();
  188. // Remove the disconnected connection from the list
  189. foreach ($_DB_DATAOBJECT['CONNECTIONS'] as $k => $v) {
  190. if ($v === $conn) {
  191. unset($_DB_DATAOBJECT['CONNECTIONS'][$k]);
  192. }
  193. }
  194. // Reconnect main memcached, or threads will stomp on
  195. // each other and corrupt their requests.
  196. $cache = Cache::instance();
  197. if ($cache) {
  198. $cache->reconnect();
  199. }
  200. // Also reconnect memcached for status_network table.
  201. if (!empty(Status_network::$cache)) {
  202. Status_network::$cache->close();
  203. Status_network::$cache = null;
  204. }
  205. }
  206. public function log($level, $msg)
  207. {
  208. common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg);
  209. }
  210. public function name()
  211. {
  212. return strtolower(get_class($this).'.'.$this->get_id());
  213. }
  214. }