parallelizingdaemon.php 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. <?php
  2. /**
  3. * StatusNet, the distributed open-source microblogging tool
  4. *
  5. * Base class for making daemons that can do several tasks in parallel.
  6. *
  7. * PHP version 5
  8. *
  9. * LICENCE: This program is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU Affero General Public License as published by
  11. * the Free Software Foundation, either version 3 of the License, or
  12. * (at your option) any later version.
  13. *
  14. * This program is distributed in the hope that it will be useful,
  15. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. * GNU Affero General Public License for more details.
  18. *
  19. * You should have received a copy of the GNU Affero General Public License
  20. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. *
  22. * @category Daemon
  23. * @package StatusNet
  24. * @author Zach Copley <zach@status.net>
  25. * @author Evan Prodromou <evan@status.net>
  26. * @copyright 2009 StatusNet, Inc.
  27. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
  28. * @link http://status.net/
  29. */
  30. if (!defined('STATUSNET') && !defined('LACONICA')) {
  31. exit(1);
  32. }
  33. declare(ticks = 1);
  34. /**
  35. * Daemon able to spawn multiple child processes to do work in parallel
  36. *
  37. * @category Daemon
  38. * @package StatusNet
  39. * @author Zach Copley <zach@status.net>
  40. * @author Evan Prodromou <evan@status.net>
  41. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
  42. * @link http://status.net/
  43. */
  44. class ParallelizingDaemon extends Daemon
  45. {
  46. private $_children = array();
  47. private $_interval = 0; // seconds
  48. private $_max_children = 0; // maximum number of children
  49. private $_debug = false;
  50. /**
  51. * Constructor
  52. *
  53. * @param string $id the name/id of this daemon
  54. * @param int $interval sleep this long before doing everything again
  55. * @param int $max_children maximum number of child processes at a time
  56. * @param boolean $debug debug output flag
  57. *
  58. * @return void
  59. *
  60. **/
  61. function __construct($id = null, $interval = 60, $max_children = 2,
  62. $debug = null)
  63. {
  64. parent::__construct(true); // daemonize
  65. $this->_interval = $interval;
  66. $this->_max_children = $max_children;
  67. $this->_debug = $debug;
  68. if (isset($id)) {
  69. $this->set_id($id);
  70. }
  71. }
  72. /**
  73. * Run the daemon
  74. *
  75. * @return void
  76. */
  77. function run()
  78. {
  79. if (isset($this->_debug)) {
  80. echo $this->name() . " - Debugging output enabled.\n";
  81. }
  82. do {
  83. $objects = $this->getObjects();
  84. foreach ($objects as $o) {
  85. // Fork a child for each object
  86. $pid = pcntl_fork();
  87. if ($pid == -1) {
  88. die ($this->name() . ' - Couldn\'t fork!');
  89. }
  90. if ($pid) {
  91. // Parent
  92. if (isset($this->_debug)) {
  93. echo $this->name() .
  94. " - Forked new child - pid $pid.\n";
  95. }
  96. $this->_children[] = $pid;
  97. } else {
  98. // Child
  99. // Do something with each object
  100. $this->childTask($o);
  101. exit();
  102. }
  103. // Remove child from ps list as it finishes
  104. while (($c = pcntl_wait($status, WNOHANG OR WUNTRACED)) > 0) {
  105. if (isset($this->_debug)) {
  106. echo $this->name() . " - Child $c finished.\n";
  107. }
  108. $this->removePs($this->_children, $c);
  109. }
  110. // Wait! We have too many damn kids.
  111. if (sizeof($this->_children) >= $this->_max_children) {
  112. if (isset($this->_debug)) {
  113. echo $this->name() . " - Too many children. Waiting...\n";
  114. }
  115. if (($c = pcntl_wait($status, WUNTRACED)) > 0) {
  116. if (isset($this->_debug)) {
  117. echo $this->name() .
  118. " - Finished waiting for child $c.\n";
  119. }
  120. $this->removePs($this->_children, $c);
  121. }
  122. }
  123. }
  124. // Remove all children from the process list before restarting
  125. while (($c = pcntl_wait($status, WUNTRACED)) > 0) {
  126. if (isset($this->_debug)) {
  127. echo $this->name() . " - Child $c finished.\n";
  128. }
  129. $this->removePs($this->_children, $c);
  130. }
  131. // Rest for a bit
  132. if (isset($this->_debug)) {
  133. echo $this->name() . ' - Waiting ' . $this->_interval .
  134. " secs before running again.\n";
  135. }
  136. if ($this->_interval > 0) {
  137. sleep($this->_interval);
  138. }
  139. } while (true);
  140. }
  141. /**
  142. * Remove a child process from the list of children
  143. *
  144. * @param array &$plist array of processes
  145. * @param int $ps process id
  146. *
  147. * @return void
  148. */
  149. function removePs(&$plist, $ps)
  150. {
  151. for ($i = 0; $i < sizeof($plist); $i++) {
  152. if ($plist[$i] == $ps) {
  153. unset($plist[$i]);
  154. $plist = array_values($plist);
  155. break;
  156. }
  157. }
  158. }
  159. /**
  160. * Get a list of objects to work on in parallel
  161. *
  162. * @return array An array of objects to work on
  163. */
  164. function getObjects()
  165. {
  166. die('Implement ParallelizingDaemon::getObjects().');
  167. }
  168. /**
  169. * Do something with each object in parallel
  170. *
  171. * @param mixed $object data to work on
  172. *
  173. * @return void
  174. */
  175. function childTask($object)
  176. {
  177. die("Implement ParallelizingDaemon::childTask($object).");
  178. }
  179. }