123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- <?php
- abstract class QueueManager extends IoManager
- {
- static $qm = null;
- protected $master = null;
- protected $handlers = array();
- protected $groups = array();
- protected $activeGroups = array();
- protected $ignoredTransports = array();
-
- public static function get()
- {
- if (empty(self::$qm)) {
- if (Event::handle('StartNewQueueManager', array(&self::$qm))) {
- $enabled = common_config('queue', 'enabled');
- $type = common_config('queue', 'subsystem');
- if (!$enabled) {
-
- self::$qm = new UnQueueManager();
- } else {
- switch ($type) {
- case 'db':
- self::$qm = new DBQueueManager();
- break;
- case 'stomp':
- self::$qm = new StompQueueManager();
- break;
- default:
- throw new ServerException("No queue manager class for type '$type'");
- }
- }
- }
- }
- return self::$qm;
- }
-
- public static function multiSite()
- {
- if (common_config('queue', 'subsystem') == 'stomp') {
- return IoManager::INSTANCE_PER_PROCESS;
- } else {
- return IoManager::SINGLE_ONLY;
- }
- }
- function __construct()
- {
- $this->initialize();
- }
-
- public function sendControlSignal($event, $param='')
- {
- throw new Exception(get_class($this) . " does not support control signals.");
- }
-
- abstract function enqueue($object, $queue);
-
- function logrep($object) {
- if (is_object($object)) {
- $class = get_class($object);
- if (isset($object->id)) {
- return "$class $object->id";
- }
- return $class;
- } elseif (is_string($object)) {
- $len = strlen($object);
- $fragment = mb_substr($object, 0, 32);
- if (mb_strlen($object) > 32) {
- $fragment .= '...';
- }
- return "string '$fragment' ($len bytes)";
- } elseif (is_array($object)) {
- return 'array with ' . count($object) .
- ' elements (keys:[' . implode(',', array_keys($object)) . '])';
- }
- return strval($object);
- }
-
- protected function encode($item)
- {
- return serialize($item);
- }
-
- protected function decode($frame)
- {
- $object = unserialize($frame);
-
-
- if (is_string($object) &&
- substr($object, 0, 1) != '<' &&
- !is_numeric($object))
- {
- $json = json_decode($object);
- if ($json === null) {
- throw new Exception('Bad frame in queue item');
- }
-
- if (empty($json->type)) {
- throw new Exception('Type not specified for queue item');
- }
- if (!is_a($json->type, 'Managed_DataObject', true)) {
- throw new Exception('Managed_DataObject class does not exist for queue item');
- }
-
- if (isset($json->id) && !empty($json->id)) {
- $object = call_user_func(array($json->type, 'getKV'), 'id', $json->id);
- } elseif (isset($json->uri) && !empty($json->uri)) {
- $object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri);
- }
-
- if (!$object instanceof Managed_DataObject) {
- throw new Exception('Queue item frame referenced a non-existant object');
- }
- }
-
- return $object;
- }
-
- function getHandler($queue)
- {
- if (isset($this->handlers[$queue])) {
- $class = $this->handlers[$queue];
- if(is_object($class)) {
- return $class;
- } else if (class_exists($class)) {
- return new $class();
- } else {
- $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
- }
- }
- throw new NoQueueHandlerException($queue);
- }
-
- function activeQueues()
- {
- $queues = array();
- foreach ($this->activeGroups as $group) {
- if (isset($this->groups[$group])) {
- $queues = array_merge($queues, $this->groups[$group]);
- }
- }
- return array_keys($queues);
- }
- function getIgnoredTransports()
- {
- return array_keys($this->ignoredTransports);
- }
- function ignoreTransport($transport)
- {
-
- $this->ignoredTransports[$transport] = true;
- }
-
- function initialize()
- {
- $this->handlers = array();
- $this->groups = array();
- $this->groupsByTransport = array();
- if (Event::handle('StartInitializeQueueManager', array($this))) {
- $this->connect('distrib', 'DistribQueueHandler');
- $this->connect('ping', 'PingQueueHandler');
- if (common_config('sms', 'enabled')) {
- $this->connect('sms', 'SmsQueueHandler');
- }
-
- $this->connect('deluser', 'DelUserQueueHandler');
- $this->connect('feedimp', 'FeedImporter');
- $this->connect('actimp', 'ActivityImporter');
- $this->connect('acctmove', 'AccountMover');
- $this->connect('actmove', 'ActivityMover');
-
- $this->connect('plugin', 'PluginQueueHandler');
- }
- Event::handle('EndInitializeQueueManager', array($this));
- }
-
- public function connect($transport, $class, $group='main')
- {
- $this->handlers[$transport] = $class;
- $this->groups[$group][$transport] = $class;
- $this->groupsByTransport[$transport] = $group;
- }
-
- function setActiveGroup($group)
- {
- $this->activeGroups = array($group);
- }
-
- function setActiveGroups($groups)
- {
- $this->activeGroups = $groups;
- }
-
- function queueGroup($queue)
- {
- if (isset($this->groupsByTransport[$queue])) {
- return $this->groupsByTransport[$queue];
- } else {
- throw new Exception("Requested group for unregistered transport $queue");
- }
- }
-
- function stats($key, $queue=false)
- {
- $owners = array();
- if ($queue) {
- $owners[] = "queue:$queue";
- $owners[] = "site:" . common_config('site', 'server');
- }
- if (isset($this->master)) {
- $this->master->stats($key, $owners);
- } else {
- $monitor = new QueueMonitor();
- $monitor->stats($key, $owners);
- }
- }
- protected function _log($level, $msg)
- {
- $class = get_class($this);
- if ($this->activeGroups) {
- $groups = ' (' . implode(',', $this->activeGroups) . ')';
- } else {
- $groups = '';
- }
- common_log($level, "$class$groups: $msg");
- }
- }
|