StompConnectionFactory.php 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. <?php
  2. declare(strict_types=1);
  3. namespace Enqueue\Stomp;
  4. use Enqueue\Dsn\Dsn;
  5. use Interop\Queue\ConnectionFactory;
  6. use Interop\Queue\Context;
  7. use Stomp\Network\Connection;
  8. class StompConnectionFactory implements ConnectionFactory
  9. {
  10. const SCHEME_EXT_ACTIVEMQ = 'activemq';
  11. const SCHEME_EXT_RABBITMQ = 'rabbitmq';
  12. /**
  13. * @var array
  14. */
  15. private $config;
  16. /**
  17. * @var BufferedStompClient
  18. */
  19. private $stomp;
  20. /**
  21. * $config = [
  22. * 'host' => null,
  23. * 'port' => null,
  24. * 'login' => null,
  25. * 'password' => null,
  26. * 'vhost' => null,
  27. * 'buffer_size' => 1000,
  28. * 'connection_timeout' => 1,
  29. * 'sync' => false,
  30. * 'lazy' => true,
  31. * 'ssl_on' => false,
  32. * ].
  33. *
  34. * or
  35. *
  36. * stomp:
  37. * stomp:?buffer_size=100
  38. *
  39. * @param array|string|null $config
  40. */
  41. public function __construct($config = 'stomp:')
  42. {
  43. if (empty($config) || 'stomp:' === $config) {
  44. $config = [];
  45. } elseif (is_string($config)) {
  46. $config = $this->parseDsn($config);
  47. } elseif (is_array($config)) {
  48. if (array_key_exists('dsn', $config)) {
  49. $config = array_replace($config, $this->parseDsn($config['dsn']));
  50. unset($config['dsn']);
  51. }
  52. } else {
  53. throw new \LogicException('The config must be either an array of options, a DSN string or null');
  54. }
  55. $this->config = array_replace($this->defaultConfig(), $config);
  56. }
  57. /**
  58. * @return StompContext
  59. */
  60. public function createContext(): Context
  61. {
  62. $useExchangePrefix = self::SCHEME_EXT_RABBITMQ === $this->config['target'] ? true : false;
  63. if ($this->config['lazy']) {
  64. return new StompContext(function () {
  65. return $this->establishConnection();
  66. }, $useExchangePrefix);
  67. }
  68. return new StompContext($this->establishConnection(), $useExchangePrefix);
  69. }
  70. private function establishConnection(): BufferedStompClient
  71. {
  72. if (false == $this->stomp) {
  73. $config = $this->config;
  74. $scheme = (true === $config['ssl_on']) ? 'ssl' : 'tcp';
  75. $uri = $scheme.'://'.$config['host'].':'.$config['port'];
  76. $connection = new Connection($uri, $config['connection_timeout']);
  77. $this->stomp = new BufferedStompClient($connection, $config['buffer_size']);
  78. $this->stomp->setLogin($config['login'], $config['password']);
  79. $this->stomp->setVhostname($config['vhost']);
  80. $this->stomp->setSync($config['sync']);
  81. $this->stomp->connect();
  82. }
  83. return $this->stomp;
  84. }
  85. private function parseDsn(string $dsn): array
  86. {
  87. $dsn = Dsn::parseFirst($dsn);
  88. if ('stomp' !== $dsn->getSchemeProtocol()) {
  89. throw new \LogicException(sprintf('The given DSN is not supported. Must start with "stomp:".'));
  90. }
  91. $schemeExtension = current($dsn->getSchemeExtensions());
  92. if (false === $schemeExtension) {
  93. $schemeExtension = self::SCHEME_EXT_RABBITMQ;
  94. }
  95. if (self::SCHEME_EXT_ACTIVEMQ !== $schemeExtension && self::SCHEME_EXT_RABBITMQ !== $schemeExtension) {
  96. throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is invalid. It must be one of "%s" or "%s".', $schemeExtension, self::SCHEME_EXT_ACTIVEMQ, self::SCHEME_EXT_RABBITMQ));
  97. }
  98. return array_filter(array_replace($dsn->getQuery(), [
  99. 'target' => $schemeExtension,
  100. 'host' => $dsn->getHost(),
  101. 'port' => $dsn->getPort(),
  102. 'login' => $dsn->getUser(),
  103. 'password' => $dsn->getPassword(),
  104. 'vhost' => null !== $dsn->getPath() ? ltrim($dsn->getPath(), '/') : null,
  105. 'buffer_size' => $dsn->getDecimal('buffer_size'),
  106. 'connection_timeout' => $dsn->getDecimal('connection_timeout'),
  107. 'sync' => $dsn->getBool('sync'),
  108. 'lazy' => $dsn->getBool('lazy'),
  109. 'ssl_on' => $dsn->getBool('ssl_on'),
  110. ]), function ($value) { return null !== $value; });
  111. }
  112. private function defaultConfig(): array
  113. {
  114. return [
  115. 'target' => self::SCHEME_EXT_RABBITMQ,
  116. 'host' => 'localhost',
  117. 'port' => 61613,
  118. 'login' => 'guest',
  119. 'password' => 'guest',
  120. 'vhost' => '/',
  121. 'buffer_size' => 1000,
  122. 'connection_timeout' => 1,
  123. 'sync' => false,
  124. 'lazy' => true,
  125. 'ssl_on' => false,
  126. ];
  127. }
  128. }