StompProducer.php 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. <?php
  2. declare(strict_types=1);
  3. namespace Enqueue\Stomp;
  4. use Interop\Queue\Destination;
  5. use Interop\Queue\Exception\InvalidDestinationException;
  6. use Interop\Queue\Exception\InvalidMessageException;
  7. use Interop\Queue\Exception\PriorityNotSupportedException;
  8. use Interop\Queue\Message;
  9. use Interop\Queue\Producer;
  10. use Stomp\Client;
  11. use Stomp\Transport\Message as StompLibMessage;
  12. class StompProducer implements Producer
  13. {
  14. /**
  15. * @var Client
  16. */
  17. private $stomp;
  18. /**
  19. * @param Client $stomp
  20. */
  21. public function __construct(Client $stomp)
  22. {
  23. $this->stomp = $stomp;
  24. }
  25. /**
  26. * @param StompDestination $destination
  27. * @param StompMessage $message
  28. */
  29. public function send(Destination $destination, Message $message): void
  30. {
  31. InvalidDestinationException::assertDestinationInstanceOf($destination, StompDestination::class);
  32. InvalidMessageException::assertMessageInstanceOf($message, StompMessage::class);
  33. $headers = array_merge($message->getHeaders(), $destination->getHeaders());
  34. $headers = StompHeadersEncoder::encode($headers, $message->getProperties());
  35. $stompMessage = new StompLibMessage($message->getBody(), $headers);
  36. $this->stomp->send($destination->getQueueName(), $stompMessage);
  37. }
  38. public function setDeliveryDelay(int $deliveryDelay = null): Producer
  39. {
  40. if (null === $deliveryDelay) {
  41. return $this;
  42. }
  43. throw new \LogicException('Not implemented');
  44. }
  45. public function getDeliveryDelay(): ?int
  46. {
  47. return null;
  48. }
  49. public function setPriority(int $priority = null): Producer
  50. {
  51. if (null === $priority) {
  52. return $this;
  53. }
  54. throw PriorityNotSupportedException::providerDoestNotSupportIt();
  55. }
  56. public function getPriority(): ?int
  57. {
  58. return null;
  59. }
  60. public function setTimeToLive(int $timeToLive = null): Producer
  61. {
  62. if (null === $timeToLive) {
  63. return $this;
  64. }
  65. throw new \LogicException('Not implemented');
  66. }
  67. public function getTimeToLive(): ?int
  68. {
  69. return null;
  70. }
  71. }