#73 New Queue System

Обединени
diogo обедини 6 ревизии от biodantas/nightly във diogo/nightly преди 5 години

+ 0 - 176
lib/queue/liberalstomp.php

@@ -1,176 +0,0 @@
-<?php
-
-/**
- * Based on code from Stomp PHP library, working around bugs in the base class.
- *
- * Original code is copyright 2005-2006 The Apache Software Foundation
- * Modifications copyright 2009 StatusNet Inc by Brion Vibber <brion@status.net>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-class LiberalStomp extends Stomp
-{
-    /**
-     * We need to be able to get the socket so advanced daemons can
-     * do a select() waiting for input both from the queue and from
-     * other sources such as an XMPP connection.
-     *
-     * @return resource
-     */
-    function getSocket()
-    {
-        return $this->_socket;
-    }
-
-    /**
-     * Return the host we're currently connected to.
-     *
-     * @return string
-     */
-    function getServer()
-    {
-        $idx = $this->_currentHost;
-        if ($idx >= 0) {
-            $host = $this->_hosts[$idx];
-            return "$host[0]:$host[1]";
-        } else {
-            return '[unconnected]';
-        }
-    }
-
-    /**
-     * Make socket connection to the server
-     * We also set the stream to non-blocking mode, since we'll be
-     * select'ing to wait for updates. In blocking mode it seems
-     * to get confused sometimes.
-     *
-     * @throws StompException
-     */
-    protected function _makeConnection ()
-    {
-        parent::_makeConnection();
-        stream_set_blocking($this->_socket, 0);
-    }
-
-    /**
-     * Version 1.0.0 of the Stomp library gets confused if messages
-     * come in too fast over the connection. This version will read
-     * out as many frames as are ready to be read from the socket.
-     *
-     * Modified from Stomp::readFrame()
-     *
-     * @return StompFrame False when no frame to read
-     */
-    public function readFrames ()
-    {
-        if (!$this->hasFrameToRead()) {
-            return false;
-        }
-        
-        $rb = 1024;
-        $data = '';
-        $end = false;
-        $frames = array();
-
-        do {
-            // @fixme this sometimes hangs in blocking mode...
-            // shouldn't we have been idle until we found there's more data?
-            $read = fread($this->_socket, $rb);
-            if ($read === false || ($read === '' && feof($this->_socket))) {
-                // @fixme possibly attempt an auto reconnect as old code?
-                throw new StompException("Error reading");
-                //$this->_reconnect();
-                // @fixme this will lose prior items
-                //return $this->readFrames();
-            }
-            $data .= $read;
-            if (strpos($data, "\x00") !== false) {
-                // Frames are null-delimited, but some servers
-                // may append an extra \n according to old bug reports.
-                $data = str_replace("\x00\n", "\x00", $data);
-                $chunks = explode("\x00", $data);
-
-                $data = array_pop($chunks);
-                $frames = array_merge($frames, $chunks);
-                if ($data == '') {
-                    // We're at the end of a frame; stop reading.
-                    break;
-                } else {
-                    // In the middle of a frame; keep going.
-                }
-            }
-            // @fixme find out why this len < 2 check was there
-            //$len = strlen($data);
-        } while (true);//$len < 2 || $end == false);
-
-        return array_map(array($this, 'parseFrame'), $frames);
-    }
-    
-    /**
-     * Parse a raw Stomp frame into an object.
-     * Extracted from Stomp::readFrame()
-     *
-     * @param string $data
-     * @return StompFrame
-     */
-    function parseFrame($data)
-    {
-        list ($header, $body) = explode("\n\n", $data, 2);
-        $header = explode("\n", $header);
-        $headers = array();
-        $command = null;
-        foreach ($header as $v) {
-            if (isset($command)) {
-                list ($name, $value) = explode(':', $v, 2);
-                $headers[$name] = $value;
-            } else {
-                $command = $v;
-            }
-        }
-        $frame = new StompFrame($command, $headers, trim($body));
-        if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
-            require_once 'Stomp/Message/Map.php';
-            return new StompMessageMap($frame);
-        } else {
-            return $frame;
-        }
-        return $frame;
-    }
-
-    /**
-     * Write frame to server
-     *
-     * @param StompFrame $stompFrame
-     */
-    protected function _writeFrame (StompFrame $stompFrame)
-    {
-        if (!is_resource($this->_socket)) {
-            require_once 'Stomp/Exception.php';
-            throw new StompException('Socket connection hasn\'t been established');
-        }
-
-        $data = $stompFrame->__toString();
-
-        // Make sure the socket's in a writable state; if not, wait a bit.
-        stream_set_blocking($this->_socket, 1);
-
-        $r = fwrite($this->_socket, $data, strlen($data));
-        stream_set_blocking($this->_socket, 0);
-        if ($r === false || $r == 0) {
-            $this->_reconnect();
-            $this->_writeFrame($stompFrame);
-        }
-    }
- }
-

+ 5 - 21
lib/queue/queuemanager.php

@@ -59,25 +59,9 @@ abstract class QueueManager extends IoManager
         if (empty(self::$qm)) {
 
             if (Event::handle('StartNewQueueManager', array(&self::$qm))) {
-
-                $enabled = common_config('queue', 'enabled');
-                $type = common_config('queue', 'subsystem');
-
-                if (!$enabled) {
-                    // does everything immediately
-                    self::$qm = new UnQueueManager();
-                } else {
-                    switch ($type) {
-                     case 'db':
-                        self::$qm = new DBQueueManager();
-                        break;
-                     case 'stomp':
-                        self::$qm = new StompQueueManager();
-                        break;
-                     default:
-                        throw new ServerException("No queue manager class for type '$type'");
-                    }
-                }
+                common_log(LOG_ERR, 'Some form of queue manager must be active' .
+                           '(UnQueue does everything immediately and is the default)');
+                throw new ServerException('Some form of queue manager must be active');
             }
         }
 
@@ -164,7 +148,7 @@ abstract class QueueManager extends IoManager
      * @param mixed $item
      * @return string
      */
-    protected function encode($item)
+    protected function encode($item): string
     {
         return serialize($item);
     }
@@ -176,7 +160,7 @@ abstract class QueueManager extends IoManager
      * @param string
      * @return mixed
      */
-    protected function decode($frame)
+    protected function decode(string $frame)
     {
         $object = unserialize($frame);
 

+ 0 - 762
lib/queue/stompqueuemanager.php

@@ -1,762 +0,0 @@
-<?php
-/**
- * StatusNet, the distributed open-source microblogging tool
- *
- * Abstract class for queue managers
- *
- * PHP version 5
- *
- * LICENCE: This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *
- * @category  QueueManager
- * @package   StatusNet
- * @author    Evan Prodromou <evan@status.net>
- * @author    Sarven Capadisli <csarven@status.net>
- * @copyright 2009 StatusNet, Inc.
- * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
- * @link      http://status.net/
- */
-
-require_once 'Stomp.php';
-require_once 'Stomp/Exception.php';
-
-class StompQueueManager extends QueueManager
-{
-    protected $servers;
-    protected $username;
-    protected $password;
-    protected $base;
-    protected $control;
-
-    protected $useTransactions;
-    protected $useAcks;
-
-    protected $sites = array();
-    protected $subscriptions = array();
-
-    protected $cons = array(); // all open connections
-    protected $disconnect = array();
-    protected $transaction = array();
-    protected $transactionCount = array();
-    protected $defaultIdx = 0;
-
-    function __construct()
-    {
-        parent::__construct();
-        $server = common_config('queue', 'stomp_server');
-        if (is_array($server)) {
-            $this->servers = $server;
-        } else {
-            $this->servers = array($server);
-        }
-        $this->username        = common_config('queue', 'stomp_username');
-        $this->password        = common_config('queue', 'stomp_password');
-        $this->base            = common_config('queue', 'queue_basename');
-        $this->control         = common_config('queue', 'control_channel');
-        $this->breakout        = common_config('queue', 'breakout');
-        $this->useTransactions = common_config('queue', 'stomp_transactions');
-        $this->useAcks         = common_config('queue', 'stomp_acks');
-    }
-
-    /**
-     * Tell the i/o master we only need a single instance to cover
-     * all sites running in this process.
-     */
-    public static function multiSite()
-    {
-        return IoManager::INSTANCE_PER_PROCESS;
-    }
-
-    /**
-     * Optional; ping any running queue handler daemons with a notification
-     * such as announcing a new site to handle or requesting clean shutdown.
-     * This avoids having to restart all the daemons manually to update configs
-     * and such.
-     *
-     * Currently only relevant for multi-site queue managers such as Stomp.
-     *
-     * @param string $event event key
-     * @param string $param optional parameter to append to key
-     * @return boolean success
-     */
-    public function sendControlSignal($event, $param='')
-    {
-        $message = $event;
-        if ($param != '') {
-            $message .= ':' . $param;
-        }
-        $this->_connect();
-        $con = $this->cons[$this->defaultIdx];
-        $result = $con->send($this->control,
-                             $message,
-                             array ('created' => common_sql_now()));
-        if ($result) {
-            $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message");
-            return true;
-        } else {
-            $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message");
-            return false;
-        }
-    }
-
-    /**
-     * Saves an object into the queue item table.
-     *
-     * @param mixed $object
-     * @param string $queue
-     * @param string $siteNickname optional override to drop into another site's queue
-     *
-     * @return boolean true on success
-     * @throws StompException on connection or send error
-     */
-    public function enqueue($object, $queue, $siteNickname=null)
-    {
-        $this->_connect();
-        if (common_config('queue', 'stomp_enqueue_on')) {
-            // We're trying to force all writes to a single server.
-            // WARNING: this might do odd things if that server connection dies.
-            $idx = array_search(common_config('queue', 'stomp_enqueue_on'),
-                                $this->servers);
-            if ($idx === false) {
-                common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.');
-                $idx = $this->defaultIdx;
-            }
-        } else {
-            $idx = $this->defaultIdx;
-        }
-        return $this->_doEnqueue($object, $queue, $idx, $siteNickname);
-    }
-
-    /**
-     * Saves a notice object reference into the queue item table
-     * on the given connection.
-     *
-     * @return boolean true on success
-     * @throws StompException on connection or send error
-     */
-    protected function _doEnqueue($object, $queue, $idx, $siteNickname=null)
-    {
-        $rep = $this->logrep($object);
-        $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'),
-                          'handler' => $queue,
-                          'payload' => $this->encode($object));
-        $msg = base64_encode(serialize($envelope));
-
-        $props = array('created' => common_sql_now());
-        if ($this->isPersistent($queue)) {
-            $props['persistent'] = 'true';
-        }
-
-        $con = $this->cons[$idx];
-        $host = $con->getServer();
-        $target = $this->queueName($queue);
-        $result = $con->send($target, $msg, $props);
-
-        if (!$result) {
-            $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target");
-            return false;
-        }
-
-        $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target");
-        $this->stats('enqueued', $queue);
-        return true;
-    }
-
-    /**
-     * Determine whether messages to this queue should be marked as persistent.
-     * Actual persistent storage depends on the queue server's configuration.
-     * @param string $queue
-     * @return bool
-     */
-    protected function isPersistent($queue)
-    {
-        $mode = common_config('queue', 'stomp_persistent');
-        if (is_array($mode)) {
-            return in_array($queue, $mode);
-        } else {
-            return (bool)$mode;
-        }
-    }
-
-    /**
-     * Send any sockets we're listening on to the IO manager
-     * to wait for input.
-     *
-     * @return array of resources
-     */
-    public function getSockets()
-    {
-        $sockets = array();
-        foreach ($this->cons as $con) {
-            if ($con) {
-                $sockets[] = $con->getSocket();
-            }
-        }
-        return $sockets;
-    }
-
-    /**
-     * Get the Stomp connection object associated with the given socket.
-     * @param resource $socket
-     * @return int index into connections list
-     * @throws Exception
-     */
-    protected function connectionFromSocket($socket)
-    {
-        foreach ($this->cons as $i => $con) {
-            if ($con && $con->getSocket() === $socket) {
-                return $i;
-            }
-        }
-        throw new Exception(__CLASS__ . " asked to read from unrecognized socket");
-    }
-
-    /**
-     * We've got input to handle on our socket!
-     * Read any waiting Stomp frame(s) and process them.
-     *
-     * @param resource $socket
-     * @return boolean ok on success
-     */
-    public function handleInput($socket)
-    {
-        $idx = $this->connectionFromSocket($socket);
-        $con = $this->cons[$idx];
-        $host = $con->getServer();
-        $this->defaultIdx = $idx;
-
-        $ok = true;
-        try {
-            $frames = $con->readFrames();
-        } catch (StompException $e) {
-            $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage());
-            fclose($socket); // ???
-            $this->cons[$idx] = null;
-            $this->transaction[$idx] = null;
-            $this->disconnect[$idx] = time();
-            return false;
-        }
-        foreach ($frames as $frame) {
-            $dest = $frame->headers['destination'];
-            if ($dest == $this->control) {
-                if (!$this->handleControlSignal($frame)) {
-                    // We got a control event that requests a shutdown;
-                    // close out and stop handling anything else!
-                    break;
-                }
-            } else {
-                $ok = $this->handleItem($frame) && $ok;
-            }
-            $this->ack($idx, $frame);
-            $this->commit($idx);
-            $this->begin($idx);
-        }
-        return $ok;
-    }
-
-    /**
-     * Attempt to reconnect in background if we lost a connection.
-     */
-    function idle()
-    {
-        $now = time();
-        foreach ($this->cons as $idx => $con) {
-            if (empty($con)) {
-                $age = $now - $this->disconnect[$idx];
-                if ($age >= 60) {
-                    $this->_reconnect($idx);
-                }
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Initialize our connection and subscribe to all the queues
-     * we're going to need to handle... If multiple queue servers
-     * are configured for failover, we'll listen to all of them.
-     *
-     * Side effects: in multi-site mode, may reset site configuration.
-     *
-     * @param IoMaster $master process/event controller
-     * @return bool return false on failure
-     */
-    public function start($master)
-    {
-        parent::start($master);
-        $this->_connectAll();
-
-        foreach ($this->cons as $i => $con) {
-            if ($con) {
-                $this->doSubscribe($con);
-                $this->begin($i);
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Close out any active connections.
-     *
-     * @return bool return false on failure
-     */
-    public function finish()
-    {
-        // If there are any outstanding delivered messages we haven't processed,
-        // free them for another thread to take.
-        foreach ($this->cons as $i => $con) {
-            if ($con) {
-                $this->rollback($i);
-                $con->disconnect();
-                $this->cons[$i] = null;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Lazy open a single connection to Stomp queue server.
-     * If multiple servers are configured, we let the Stomp client library
-     * worry about finding a working connection among them.
-     */
-    protected function _connect()
-    {
-        if (empty($this->cons)) {
-            $list = $this->servers;
-            if (count($list) > 1) {
-                shuffle($list); // Randomize to spread load
-                $url = 'failover://(' . implode(',', $list) . ')';
-            } else {
-                $url = $list[0];
-            }
-            $con = $this->_doConnect($url);
-            $this->cons = array($con);
-            $this->transactionCount = array(0);
-            $this->transaction = array(null);
-            $this->disconnect = array(null);
-        }
-    }
-
-    /**
-     * Lazy open connections to all Stomp servers, if in manual failover
-     * mode. This means the queue servers don't speak to each other, so
-     * we have to listen to all of them to make sure we get all events.
-     */
-    protected function _connectAll()
-    {
-        if (!common_config('queue', 'stomp_manual_failover')) {
-            return $this->_connect();
-        }
-        if (empty($this->cons)) {
-            $this->cons = array();
-            $this->transactionCount = array();
-            $this->transaction = array();
-            foreach ($this->servers as $idx => $server) {
-                try {
-                    $this->cons[] = $this->_doConnect($server);
-                    $this->disconnect[] = null;
-                } catch (Exception $e) {
-                    // s'okay, we'll live
-                    $this->cons[] = null;
-                    $this->disconnect[] = time();
-                }
-                $this->transactionCount[] = 0;
-                $this->transaction[] = null;
-            }
-            if (empty($this->cons)) {
-                throw new ServerException("No queue servers reachable...");
-                return false;
-            }
-        }
-    }
-
-    /**
-     * Attempt to manually reconnect to the Stomp server for the given
-     * slot. If successful, set up our subscriptions on it.
-     */
-    protected function _reconnect($idx)
-    {
-        try {
-            $con = $this->_doConnect($this->servers[$idx]);
-        } catch (Exception $e) {
-            $this->_log(LOG_ERR, $e->getMessage());
-            $con = null;
-        }
-        if ($con) {
-            $this->cons[$idx] = $con;
-            $this->disconnect[$idx] = null;
-
-            $this->doSubscribe($con);
-            $this->begin($idx);
-        } else {
-            // Try again later...
-            $this->disconnect[$idx] = time();
-        }
-    }
-
-    protected function _doConnect($server)
-    {
-        $this->_log(LOG_INFO, "Connecting to '$server' as '$this->username'...");
-        $con = new LiberalStomp($server);
-
-        if ($con->connect($this->username, $this->password)) {
-            $this->_log(LOG_INFO, "Connected.");
-        } else {
-            $this->_log(LOG_ERR, 'Failed to connect to queue server');
-            throw new ServerException('Failed to connect to queue server');
-        }
-
-        return $con;
-    }
-
-    /**
-     * Set up all our raw queue subscriptions on the given connection
-     * @param LiberalStomp $con
-     */
-    protected function doSubscribe(LiberalStomp $con)
-    {
-        $host = $con->getServer();
-        foreach ($this->subscriptions() as $sub) {
-            $this->_log(LOG_INFO, "Subscribing to $sub on $host");
-            $con->subscribe($sub);
-        }
-    }
-    
-    /**
-     * Grab a full list of stomp-side queue subscriptions.
-     * Will include:
-     *  - control broadcast channel
-     *  - shared group queues for active groups
-     *  - per-handler and per-site breakouts from $config['queue']['breakout']
-     *    that are rooted in the active groups.
-     *
-     * @return array of strings
-     */
-    protected function subscriptions()
-    {
-        $subs = array();
-        $subs[] = $this->control;
-
-        foreach ($this->activeGroups as $group) {
-            $subs[] = $this->base . $group;
-        }
-
-        foreach ($this->breakout as $spec) {
-            $parts = explode('/', $spec);
-            if (count($parts) < 2 || count($parts) > 3) {
-                common_log(LOG_ERR, "Bad queue breakout specifier $spec");
-            }
-            if (in_array($parts[0], $this->activeGroups)) {
-                $subs[] = $this->base . $spec;
-            }
-        }
-        return array_unique($subs);
-    }
-
-    /**
-     * Handle and acknowledge an event that's come in through a queue.
-     *
-     * If the queue handler reports failure, the message is requeued for later.
-     * Missing notices or handler classes will drop the message.
-     *
-     * Side effects: in multi-site mode, may reset site configuration to
-     * match the site that queued the event.
-     *
-     * @param StompFrame $frame
-     * @return bool success
-     */
-    protected function handleItem($frame)
-    {
-        $host = $this->cons[$this->defaultIdx]->getServer();
-        $message = unserialize(base64_decode($frame->body));
-
-        if ($message === false) {
-            $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}");
-            $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body));
-            return false;
-        }
-
-        $site = $message['site'];
-        $queue = $message['handler'];
-
-        if ($this->isDeadLetter($frame, $message)) {
-            $this->stats('deadletter', $queue);
-	        return false;
-        }
-
-        // @fixme detect failing site switches
-        $this->switchSite($site);
-
-        try {
-            $item = $this->decode($message['payload']);
-        } catch (Exception $e) {
-            $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
-            $this->stats('baditem', $queue);
-            return false;
-        }
-        $info = $this->logrep($item) . " posted at " .
-                $frame->headers['created'] . " in queue $queue from $host";
-        $this->_log(LOG_DEBUG, "Dequeued $info");
-
-        try {
-            $handler = $this->getHandler($queue);
-            $ok = $handler->handle($item);
-        } catch (NoQueueHandlerException $e) {
-            $this->_log(LOG_ERR, "Missing handler class; skipping $info");
-            $this->stats('badhandler', $queue);
-            return false;
-        } catch (Exception $e) {
-            $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage());
-            $ok = false;
-        }
-
-        if ($ok) {
-            $this->_log(LOG_INFO, "Successfully handled $info");
-            $this->stats('handled', $queue);
-        } else {
-            $this->_log(LOG_WARNING, "Failed handling $info");
-            // Requeing moves the item to the end of the line for its next try.
-            // @fixme add a manual retry count
-            $this->enqueue($item, $queue);
-            $this->stats('requeued', $queue);
-        }
-
-        return $ok;
-    }
-
-    /**
-     * Check if a redelivered message has been run through enough
-     * that we're going to give up on it.
-     *
-     * @param StompFrame $frame
-     * @param array $message unserialized message body
-     * @return boolean true if we should discard
-     */
-    protected function isDeadLetter($frame, $message)
-    {
-        if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') {
-	        // Message was redelivered, possibly indicating a previous failure.
-            $msgId = $frame->headers['message-id'];
-            $site = $message['site'];
-            $queue = $message['handler'];
-	        $msgInfo = "message $msgId for $site in queue $queue";
-
-	        $deliveries = $this->incDeliveryCount($msgId);
-	        if ($deliveries > common_config('queue', 'max_retries')) {
-		        $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo";
-
-		        $outdir = common_config('queue', 'dead_letter_dir');
-		        if ($outdir) {
-    		        $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId);
-    		        $info .= ": dumping to $filename";
-    		        file_put_contents($filename, $message['payload']);
-		        }
-
-		        common_log(LOG_ERR, $info);
-		        return true;
-	        } else {
-	            common_log(LOG_INFO, "retry $deliveries on $msgInfo");
-	        }
-        }
-        return false;
-    }
-
-    /**
-     * Update count of times we've re-encountered this message recently,
-     * triggered when we get a message marked as 'redelivered'.
-     *
-     * Requires a CLI-friendly cache configuration.
-     *
-     * @param string $msgId message-id header from message
-     * @return int number of retries recorded
-     */
-    function incDeliveryCount($msgId)
-    {
-	    $count = 0;
-	    $cache = Cache::instance();
-	    if ($cache) {
-		    $key = 'statusnet:stomp:message-retries:' . $msgId;
-		    $count = $cache->increment($key);
-		    if (!$count) {
-			    $count = 1;
-			    $cache->set($key, $count, null, 3600);
-			    $got = $cache->get($key);
-		    }
-	    }
-	    return $count;
-    }
-
-    /**
-     * Process a control signal broadcast.
-     *
-     * @param int $idx connection index
-     * @param array $frame Stomp frame
-     * @return bool true to continue; false to stop further processing.
-     */
-    protected function handleControlSignal($idx, $frame)
-    {
-        $message = trim($frame->body);
-        if (strpos($message, ':') !== false) {
-            list($event, $param) = explode(':', $message, 2);
-        } else {
-            $event = $message;
-            $param = '';
-        }
-
-        $shutdown = false;
-
-        if ($event == 'shutdown') {
-            $this->master->requestShutdown();
-            $shutdown = true;
-        } else if ($event == 'restart') {
-            $this->master->requestRestart();
-            $shutdown = true;
-        } else if ($event == 'update') {
-            $this->updateSiteConfig($param);
-        } else {
-            $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
-        }
-        return $shutdown;
-    }
-
-    /**
-     * Switch site, if necessary, and reset current handler assignments
-     * @param string $site
-     */
-    function switchSite($site)
-    {
-        if ($site != GNUsocial::currentSite()) {
-            $this->stats('switch');
-            GNUsocial::switchSite($site);
-            $this->initialize();
-        }
-    }
-
-    /**
-     * (Re)load runtime configuration for a given site by nickname,
-     * triggered by a broadcast to the 'statusnet-control' topic.
-     *
-     * Configuration changes in database should update, but config
-     * files might not.
-     *
-     * @param array $frame Stomp frame
-     * @return bool true to continue; false to stop further processing.
-     */
-    protected function updateSiteConfig($nickname)
-    {
-        $sn = Status_network::getKV('nickname', $nickname);
-        if ($sn) {
-            $this->switchSite($nickname);
-            if (!in_array($nickname, $this->sites)) {
-                $this->addSite();
-            }
-            $this->stats('siteupdate');
-        } else {
-            $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
-        }
-    }
-
-    /**
-     * Combines the queue_basename from configuration with the
-     * group name for this queue to give eg:
-     *
-     * /queue/statusnet/main
-     * /queue/statusnet/main/distrib
-     * /queue/statusnet/xmpp/xmppout/site01
-     *
-     * @param string $queue
-     * @return string
-     */
-    protected function queueName($queue)
-    {
-        $group = $this->queueGroup($queue);
-        $site = GNUsocial::currentSite();
-
-        $specs = array("$group/$queue/$site",
-                       "$group/$queue");
-        foreach ($specs as $spec) {
-            if (in_array($spec, $this->breakout)) {
-                return $this->base . $spec;
-            }
-        }
-        return $this->base . $group;
-    }
-
-    /**
-     * Get the breakout mode for the given queue on the current site.
-     *
-     * @param string $queue
-     * @return string one of 'shared', 'handler', 'site'
-     */
-    protected function breakoutMode($queue)
-    {
-        $breakout = common_config('queue', 'breakout');
-        if (isset($breakout[$queue])) {
-            return $breakout[$queue];
-        } else if (isset($breakout['*'])) {
-            return $breakout['*'];
-        } else {
-            return 'shared';
-        }
-    }
-
-    protected function begin($idx)
-    {
-        if ($this->useTransactions) {
-            if (!empty($this->transaction[$idx])) {
-                throw new Exception("Tried to start transaction in the middle of a transaction");
-            }
-            $this->transactionCount[$idx]++;
-            $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time();
-            $this->cons[$idx]->begin($this->transaction[$idx]);
-        }
-    }
-
-    protected function ack($idx, $frame)
-    {
-        if ($this->useAcks) {
-            if ($this->useTransactions) {
-                if (empty($this->transaction[$idx])) {
-                    throw new Exception("Tried to ack but not in a transaction");
-                }
-                $this->cons[$idx]->ack($frame, $this->transaction[$idx]);
-            } else {
-                $this->cons[$idx]->ack($frame);
-            }
-        }
-    }
-
-    protected function commit($idx)
-    {
-        if ($this->useTransactions) {
-            if (empty($this->transaction[$idx])) {
-                throw new Exception("Tried to commit but not in a transaction");
-            }
-            $this->cons[$idx]->commit($this->transaction[$idx]);
-            $this->transaction[$idx] = null;
-        }
-    }
-
-    protected function rollback($idx)
-    {
-        if ($this->useTransactions) {
-            if (empty($this->transaction[$idx])) {
-                throw new Exception("Tried to rollback but not in a transaction");
-            }
-            $this->cons[$idx]->commit($this->transaction[$idx]);
-            $this->transaction[$idx] = null;
-        }
-    }
-}
-

+ 1 - 0
lib/util/default.php

@@ -361,6 +361,7 @@ $default =
                 'Poll' => [],
                 'SimpleCaptcha' => [],
                 'TagSub' => [],
+                'UnQueue' => [],
                 'WebFinger' => [],
             ],
             'locale_path' => false, // Set to a path to use *instead of* each plugin's own locale subdirectories

+ 49 - 0
plugins/DBQueue/DBQueuePlugin.php

@@ -0,0 +1,49 @@
+<?php
+// This file is part of GNU social - https://www.gnu.org/software/social
+//
+// GNU social is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// GNU social is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with GNU social.  If not, see <http://www.gnu.org/licenses/>.
+
+/**
+ * DB interface for GNU social queues
+ *
+ * @package   GNUsocial
+ * @author    Miguel Dantas <biodantasgs@gmail.com>
+ * @copyright 2019 Free Software Foundation, Inc http://www.fsf.org
+ * @license   https://www.gnu.org/licenses/agpl.html GNU AGPL v3 or later
+ */
+
+defined('GNUSOCIAL') || die();
+
+class DBQueuePlugin extends Plugin
+{
+    const PLUGIN_VERSION = '0.0.1';
+
+    public function onStartNewQueueManager(?QueueManager &$qm)
+    {
+        common_debug("Starting DB queue manager.");
+        $qm = new DBQueueManager();
+        return false;
+    }
+
+    public function onPluginVersion(array &$versions): bool
+    {
+        $versions[] = array('name' => 'DBQueue',
+                            'version' => self::PLUGIN_VERSION,
+                            'author' => 'Miguel Dantas',
+                            'description' =>
+                            // TRANS: Plugin description.
+                            _m('Plugin using the database as a backend for GNU social queues'));
+        return true;
+    }
+};

+ 18 - 0
plugins/DBQueue/README

@@ -0,0 +1,18 @@
+DBQueuePlugin wraps the DBQueueManager class which is a queue manager
+that uses the database as it's backing storage.
+
+Installation
+============
+
+This plugin is replaces other queue manager plugins, such as UnQueue,
+which enabled by default and which should, but is not required to be
+disabled.
+
+addPlugin('DBQueue');
+
+Example
+=======
+
+In config.php
+
+addPlugin('DBQueue');

+ 10 - 7
lib/queue/dbqueuemanager.php

@@ -50,13 +50,13 @@ class DBQueueManager extends QueueManager
         }
 
         $this->stats('enqueued', $queue);
-
         return true;
     }
 
     /**
      * Poll every 10 seconds for new events during idle periods.
      * We'll look in more often when there's data available.
+     * Must be greater than 0 for the poll method to be called
      *
      * @return int seconds
      */
@@ -69,9 +69,9 @@ class DBQueueManager extends QueueManager
      * Run a polling cycle during idle processing in the input loop.
      * @return boolean true if we should poll again for more data immediately
      */
-    public function poll()
+    public function poll(): bool
     {
-        //$this->_log(LOG_DEBUG, 'Checking for notices...');
+        $this->_log(LOG_DEBUG, 'Checking for notices...');
         $qi = Queue_item::top($this->activeQueues(), $this->getIgnoredTransports());
         if (!$qi instanceof Queue_item) {
             //$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
@@ -88,7 +88,7 @@ class DBQueueManager extends QueueManager
 
         $rep = $this->logrep($item);
         $this->_log(LOG_DEBUG, 'Got '._ve($rep).' for transport '._ve($qi->transport));
-        
+
         try {
             $handler = $this->getHandler($qi->transport);
             $result = $handler->handle($item);
@@ -96,13 +96,16 @@ class DBQueueManager extends QueueManager
             $this->noHandlerFound($qi, $rep);
             return true;
         } catch (NoResultException $e) {
-            $this->_log(LOG_ERR, "[{$qi->transport}:$rep] ".get_class($e).' thrown ('._ve($e->getMessage()).'), ignoring queue_item '._ve($qi->getID()));
+            $this->_log(LOG_ERR, "[{$qi->transport}:$rep] ".get_class($e).' thrown ('.
+                        _ve($e->getMessage()).'), ignoring queue_item '._ve($qi->getID()));
             $result = true;
         } catch (AlreadyFulfilledException $e) {
-            $this->_log(LOG_ERR, "[{$qi->transport}:$rep] ".get_class($e).' thrown ('._ve($e->getMessage()).'), ignoring queue_item '._ve($qi->getID()));
+            $this->_log(LOG_ERR, "[{$qi->transport}:$rep] ".get_class($e).' thrown ('.
+                        _ve($e->getMessage()).'), ignoring queue_item '._ve($qi->getID()));
             $result = true;
         } catch (Exception $e) {
-            $this->_log(LOG_ERR, "[{$qi->transport}:$rep] Exception (".get_class($e).') thrown: '._ve($e->getMessage()));
+            $this->_log(LOG_ERR, "[{$qi->transport}:$rep] Exception (".
+                        get_class($e).') thrown: '._ve($e->getMessage()));
             $result = false;
         }
 

+ 12 - 15
plugins/OpportunisticQM/OpportunisticQMPlugin.php

@@ -1,17 +1,16 @@
 <?php
 
 class OpportunisticQMPlugin extends Plugin {
-    const PLUGIN_VERSION = '2.0.0';
+    const PLUGIN_VERSION = '3.0.0';
 
     public $qmkey = false;
-    public $secs_per_action = 1; // total seconds to run script per action
+    public $secs_per_action = 1;     // total seconds to run script per action
     public $rel_to_pageload = true;  // relative to pageload or queue start
     public $verbosity = 1;
 
     public function onRouterInitialized($m)
     {
-        $m->connect('main/runqueue',
-                    ['action' => 'runqueue']);
+        $m->connect('main/runqueue', ['action' => 'runqueue']);
     }
 
     /**
@@ -26,23 +25,21 @@ class OpportunisticQMPlugin extends Plugin {
 
         global $_startTime;
 
-        $args = array(
-                    'qmkey' => common_config('opportunisticqm', 'qmkey'),
-                    'max_execution_time' => $this->secs_per_action,
-                    'started_at'      => $this->rel_to_pageload ? $_startTime : null,
-                    'verbosity'          => $this->verbosity,
-                );
-        $qm = new OpportunisticQueueManager($args); 
+        $args = ['qmkey'              => common_config('opportunisticqm', 'qmkey'),
+                 'max_execution_time' => $this->secs_per_action,
+                 'started_at'         => $this->rel_to_pageload ? $_startTime : null,
+                 'verbosity'          => $this->verbosity];
+        $qm = new OpportunisticQueueManager($args);
         $qm->runQueue();
         return true;
     }
 
     public function onPluginVersion(array &$versions): bool
     {
-        $versions[] = array('name' => 'OpportunisticQM',
-                            'version' => self::PLUGIN_VERSION,
-                            'author' => 'Mikael Nordfeldth',
-                            'homepage' => 'http://www.gnu.org/software/social/',
+        $versions[] = array('name'        => 'OpportunisticQM',
+                            'version'     => self::PLUGIN_VERSION,
+                            'author'      => 'Mikael Nordfeldth',
+                            'homepage'    => 'http://www.gnu.org/software/social/',
                             'description' =>
                             // TRANS: Plugin description.
                             _m('Opportunistic queue manager plugin for background processing.'));

+ 4 - 6
plugins/OpportunisticQM/actions/runqueue.php

@@ -23,16 +23,14 @@ class RunqueueAction extends Action
 {
     protected $qm = null;
 
-    protected function prepare(array $args=array())
+    protected function prepare(array $args = [])
     {
         parent::prepare($args);
 
-        $args = array();
+        $args = [];
 
-        foreach (array('qmkey') as $key) {
-            if ($this->arg($key) !== null) {
-                $args[$key] = $this->arg($key);
-            }
+        if ($this->arg('qmkey') !== null) {
+            $args['qmkey'] = $this->arg('qmkey');
         }
 
         try {

+ 0 - 0
plugins/OpportunisticQM/lib/opportunisticqueuemanager.php


Някои файлове не бяха показани, защото твърде много файлове са промени