123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- <?php
- /**
- * StatusNet, the distributed open-source microblogging tool
- *
- * Base class for making daemons that can do several tasks in parallel.
- *
- * PHP version 5
- *
- * LICENCE: This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * @category Daemon
- * @package StatusNet
- * @author Zach Copley <zach@status.net>
- * @author Evan Prodromou <evan@status.net>
- * @copyright 2009 StatusNet, Inc.
- * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
- * @link http://status.net/
- */
- if (!defined('STATUSNET') && !defined('LACONICA')) {
- exit(1);
- }
- declare(ticks = 1);
- /**
- * Daemon able to spawn multiple child processes to do work in parallel
- *
- * @category Daemon
- * @package StatusNet
- * @author Zach Copley <zach@status.net>
- * @author Evan Prodromou <evan@status.net>
- * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
- * @link http://status.net/
- */
- class ParallelizingDaemon extends Daemon
- {
- private $_children = array();
- private $_interval = 0; // seconds
- private $_max_children = 0; // maximum number of children
- private $_debug = false;
- /**
- * Constructor
- *
- * @param string $id the name/id of this daemon
- * @param int $interval sleep this long before doing everything again
- * @param int $max_children maximum number of child processes at a time
- * @param boolean $debug debug output flag
- *
- * @return void
- *
- **/
- function __construct($id = null, $interval = 60, $max_children = 2,
- $debug = null)
- {
- parent::__construct(true); // daemonize
- $this->_interval = $interval;
- $this->_max_children = $max_children;
- $this->_debug = $debug;
- if (isset($id)) {
- $this->set_id($id);
- }
- }
- /**
- * Run the daemon
- *
- * @return void
- */
- function run()
- {
- if (isset($this->_debug)) {
- echo $this->name() . " - Debugging output enabled.\n";
- }
- do {
- $objects = $this->getObjects();
- foreach ($objects as $o) {
- // Fork a child for each object
- $pid = pcntl_fork();
- if ($pid == -1) {
- die ($this->name() . ' - Couldn\'t fork!');
- }
- if ($pid) {
- // Parent
- if (isset($this->_debug)) {
- echo $this->name() .
- " - Forked new child - pid $pid.\n";
- }
- $this->_children[] = $pid;
- } else {
- // Child
- // Do something with each object
- $this->childTask($o);
- exit();
- }
- // Remove child from ps list as it finishes
- while (($c = pcntl_wait($status, WNOHANG OR WUNTRACED)) > 0) {
- if (isset($this->_debug)) {
- echo $this->name() . " - Child $c finished.\n";
- }
- $this->removePs($this->_children, $c);
- }
- // Wait! We have too many damn kids.
- if (sizeof($this->_children) >= $this->_max_children) {
- if (isset($this->_debug)) {
- echo $this->name() . " - Too many children. Waiting...\n";
- }
- if (($c = pcntl_wait($status, WUNTRACED)) > 0) {
- if (isset($this->_debug)) {
- echo $this->name() .
- " - Finished waiting for child $c.\n";
- }
- $this->removePs($this->_children, $c);
- }
- }
- }
- // Remove all children from the process list before restarting
- while (($c = pcntl_wait($status, WUNTRACED)) > 0) {
- if (isset($this->_debug)) {
- echo $this->name() . " - Child $c finished.\n";
- }
- $this->removePs($this->_children, $c);
- }
- // Rest for a bit
- if (isset($this->_debug)) {
- echo $this->name() . ' - Waiting ' . $this->_interval .
- " secs before running again.\n";
- }
- if ($this->_interval > 0) {
- sleep($this->_interval);
- }
- } while (true);
- }
- /**
- * Remove a child process from the list of children
- *
- * @param array &$plist array of processes
- * @param int $ps process id
- *
- * @return void
- */
- function removePs(&$plist, $ps)
- {
- for ($i = 0; $i < sizeof($plist); $i++) {
- if ($plist[$i] == $ps) {
- unset($plist[$i]);
- $plist = array_values($plist);
- break;
- }
- }
- }
- /**
- * Get a list of objects to work on in parallel
- *
- * @return array An array of objects to work on
- */
- function getObjects()
- {
- die('Implement ParallelizingDaemon::getObjects().');
- }
- /**
- * Do something with each object in parallel
- *
- * @param mixed $object data to work on
- *
- * @return void
- */
- function childTask($object)
- {
- die("Implement ParallelizingDaemon::childTask($object).");
- }
- }
|