RedisQueueManager.php 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. <?php
  2. use Predis\Client;
  3. class RedisQueueManager extends QueueManager
  4. {
  5. protected $queue;
  6. protected $client = null;
  7. protected $server;
  8. public function __construct(string $server)
  9. {
  10. parent::__construct();
  11. $this->server = $server;
  12. $this->queue = 'gnusocial:' . common_config('site', 'name');
  13. }
  14. private function _ensureConn()
  15. {
  16. if ($this->client === null) {
  17. $this->client = new Client($this->server);
  18. }
  19. }
  20. public function pollInterval()
  21. {
  22. return 10;
  23. }
  24. public function enqueue($object, $queue)
  25. {
  26. $this->_ensureConn();
  27. $ret = $this->client->rpush($this->queue, $this->encode([$queue, $object]));
  28. if (empty($ret)) {
  29. common_log(LOG_ERR, "Unable to insert object into Redis queue {$queue}");
  30. } else {
  31. common_debug("The Redis queue for {$queue} has length {$ret}");
  32. }
  33. }
  34. public function poll()
  35. {
  36. try {
  37. $this->_ensureConn();
  38. $ret = $this->client->lpop($this->queue);
  39. if (!empty($ret)) {
  40. list($queue, $object) = $this->decode($ret);
  41. } else {
  42. return false;
  43. }
  44. } catch (Exception $e) {
  45. $this->_log(LOG_INFO, "[Queue {$queue}] Discarding: " . _ve($e->getMessage()));
  46. return false;
  47. }
  48. try {
  49. $handler = $this->getHandler($queue);
  50. $handler->handle($object);
  51. common_debug("Redis Queue handled item from {$queue} queue");
  52. return true;
  53. } catch (Exception $e) {
  54. $this->_log(LOG_ERR, "[Queue: {$queue}] `" . get_class($e) . '` thrown: ' . _ve($e->getMessage()));
  55. return false;
  56. }
  57. }
  58. };