StompContext.php 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. <?php
  2. declare(strict_types=1);
  3. namespace Enqueue\Stomp;
  4. use Interop\Queue\Consumer;
  5. use Interop\Queue\Context;
  6. use Interop\Queue\Destination;
  7. use Interop\Queue\Exception\InvalidDestinationException;
  8. use Interop\Queue\Exception\PurgeQueueNotSupportedException;
  9. use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
  10. use Interop\Queue\Message;
  11. use Interop\Queue\Producer;
  12. use Interop\Queue\Queue;
  13. use Interop\Queue\SubscriptionConsumer;
  14. use Interop\Queue\Topic;
  15. class StompContext implements Context
  16. {
  17. /**
  18. * @var BufferedStompClient
  19. */
  20. private $stomp;
  21. /**
  22. * @var bool
  23. */
  24. private $useExchangePrefix;
  25. /**
  26. * @var callable
  27. */
  28. private $stompFactory;
  29. /**
  30. * @param BufferedStompClient|callable $stomp
  31. * @param bool $useExchangePrefix
  32. */
  33. public function __construct($stomp, $useExchangePrefix = true)
  34. {
  35. if ($stomp instanceof BufferedStompClient) {
  36. $this->stomp = $stomp;
  37. } elseif (is_callable($stomp)) {
  38. $this->stompFactory = $stomp;
  39. } else {
  40. throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.');
  41. }
  42. $this->useExchangePrefix = $useExchangePrefix;
  43. }
  44. /**
  45. * @return StompMessage
  46. */
  47. public function createMessage(string $body = '', array $properties = [], array $headers = []): Message
  48. {
  49. return new StompMessage($body, $properties, $headers);
  50. }
  51. /**
  52. * @return StompDestination
  53. */
  54. public function createQueue(string $name): Queue
  55. {
  56. if (0 !== strpos($name, '/')) {
  57. $destination = new StompDestination();
  58. $destination->setType(StompDestination::TYPE_QUEUE);
  59. $destination->setStompName($name);
  60. return $destination;
  61. }
  62. return $this->createDestination($name);
  63. }
  64. /**
  65. * @return StompDestination
  66. */
  67. public function createTemporaryQueue(): Queue
  68. {
  69. $queue = $this->createQueue(uniqid('', true));
  70. $queue->setType(StompDestination::TYPE_TEMP_QUEUE);
  71. return $queue;
  72. }
  73. /**
  74. * @return StompDestination
  75. */
  76. public function createTopic(string $name): Topic
  77. {
  78. if (0 !== strpos($name, '/')) {
  79. $destination = new StompDestination();
  80. $destination->setType($this->useExchangePrefix ? StompDestination::TYPE_EXCHANGE : StompDestination::TYPE_TOPIC);
  81. $destination->setStompName($name);
  82. return $destination;
  83. }
  84. return $this->createDestination($name);
  85. }
  86. public function createDestination(string $destination): StompDestination
  87. {
  88. $types = [
  89. StompDestination::TYPE_TOPIC,
  90. StompDestination::TYPE_EXCHANGE,
  91. StompDestination::TYPE_QUEUE,
  92. StompDestination::TYPE_AMQ_QUEUE,
  93. StompDestination::TYPE_TEMP_QUEUE,
  94. StompDestination::TYPE_REPLY_QUEUE,
  95. ];
  96. $dest = $destination;
  97. $type = null;
  98. $name = null;
  99. $routingKey = null;
  100. foreach ($types as $_type) {
  101. $typePrefix = '/'.$_type.'/';
  102. if (0 === strpos($dest, $typePrefix)) {
  103. $type = $_type;
  104. $dest = substr($dest, strlen($typePrefix));
  105. break;
  106. }
  107. }
  108. if (null === $type) {
  109. throw new \LogicException(sprintf('Destination name is invalid, cant find type: "%s"', $destination));
  110. }
  111. $pieces = explode('/', $dest);
  112. if (count($pieces) > 2) {
  113. throw new \LogicException(sprintf('Destination name is invalid, found extra / char: "%s"', $destination));
  114. }
  115. if (empty($pieces[0])) {
  116. throw new \LogicException(sprintf('Destination name is invalid, name is empty: "%s"', $destination));
  117. }
  118. $name = $pieces[0];
  119. if (isset($pieces[1])) {
  120. if (empty($pieces[1])) {
  121. throw new \LogicException(sprintf('Destination name is invalid, routing key is empty: "%s"', $destination));
  122. }
  123. $routingKey = $pieces[1];
  124. }
  125. $destination = new StompDestination();
  126. $destination->setType($type);
  127. $destination->setStompName($name);
  128. $destination->setRoutingKey($routingKey);
  129. return $destination;
  130. }
  131. /**
  132. * @param StompDestination $destination
  133. *
  134. * @return StompConsumer
  135. */
  136. public function createConsumer(Destination $destination): Consumer
  137. {
  138. InvalidDestinationException::assertDestinationInstanceOf($destination, StompDestination::class);
  139. return new StompConsumer($this->getStomp(), $destination);
  140. }
  141. /**
  142. * @return StompProducer
  143. */
  144. public function createProducer(): Producer
  145. {
  146. return new StompProducer($this->getStomp());
  147. }
  148. public function close(): void
  149. {
  150. $this->getStomp()->disconnect();
  151. }
  152. public function createSubscriptionConsumer(): SubscriptionConsumer
  153. {
  154. throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
  155. }
  156. public function purgeQueue(Queue $queue): void
  157. {
  158. throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
  159. }
  160. public function getStomp(): BufferedStompClient
  161. {
  162. if (false == $this->stomp) {
  163. $stomp = call_user_func($this->stompFactory);
  164. if (false == $stomp instanceof BufferedStompClient) {
  165. throw new \LogicException(sprintf(
  166. 'The factory must return instance of BufferedStompClient. It returns %s',
  167. is_object($stomp) ? get_class($stomp) : gettype($stomp)
  168. ));
  169. }
  170. $this->stomp = $stomp;
  171. }
  172. return $this->stomp;
  173. }
  174. }