123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- <?php
- class OpportunisticQueueManager extends DBQueueManager
- {
- protected $qmkey = false;
- protected $max_execution_time = null;
- protected $max_execution_margin = null;
- protected $max_queue_items = null;
- protected $started_at = null;
- protected $handled_items = 0;
- protected $verbosity = null;
- const MAXEXECTIME = 20;
- public function __construct(array $args=array()) {
- foreach (get_class_vars(get_class($this)) as $key=>$val) {
- if (array_key_exists($key, $args)) {
- $this->$key = $args[$key];
- }
- }
- $this->verifyKey();
- if ($this->started_at === null) {
- $this->started_at = time();
- }
- if ($this->max_execution_time === null) {
- $this->max_execution_time = ini_get('max_execution_time') ?: self::MAXEXECTIME;
- }
- if ($this->max_execution_margin === null) {
- $this->max_execution_margin = common_config('http', 'connect_timeout') + 1;
- }
- return parent::__construct();
- }
- protected function verifyKey()
- {
- if ($this->qmkey !== common_config('opportunisticqm', 'qmkey')) {
- throw new RunQueueBadKeyException($this->qmkey);
- }
- }
- public function canContinue()
- {
- $time_passed = time() - $this->started_at;
-
- if ($time_passed <= 0 && (!is_null($this->max_queue_items) && $this->max_queue_items <= 0)) {
- return false;
- }
-
- if ($time_passed >= $this->max_execution_time || $time_passed > ini_get('max_execution_time') - $this->max_execution_margin) {
- return false;
- }
-
- if (!is_null($this->max_queue_items) && $this->handled_items >= $this->max_queue_items) {
- return false;
- }
- return true;
- }
- public function poll()
- {
- $this->handled_items++;
- if (!parent::poll()) {
- throw new RunQueueOutOfWorkException();
- }
- return true;
- }
-
-
- protected function noHandlerFound(Queue_item $qi, $rep=null) {
- $this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler");
- $this->_fail($qi, true);
- }
- protected function _fail(Queue_item $qi, $releaseOnly=false)
- {
- parent::_fail($qi, $releaseOnly);
- $this->_log(LOG_DEBUG, "[{$qi->transport}:item {$qi->id}] Ignoring this transport for the rest of this execution");
- $this->ignoreTransport($qi->transport);
- }
-
- public function runQueue()
- {
- while ($this->canContinue()) {
- try {
- $this->poll();
- } catch (RunQueueOutOfWorkException $e) {
- return true;
- }
- }
- if ($this->handled_items > 0) {
- common_debug('Opportunistic queue manager passed execution time/item handling limit without being out of work.');
- } elseif ($this->verbosity > 1) {
- common_debug('Opportunistic queue manager did not have time to start on this action (max: '.$this->max_execution_time.' exceeded: '.abs(time()-$this->started_at).').');
- }
- return false;
- }
- }
|