BufferedStompClient.php 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. <?php
  2. namespace Enqueue\Stomp;
  3. use Stomp\Client;
  4. use Stomp\Transport\Frame;
  5. class BufferedStompClient extends Client
  6. {
  7. /**
  8. * [
  9. * 'subscriptionId' => Frame[],
  10. * ].
  11. *
  12. * @var array
  13. */
  14. private $buffer;
  15. /**
  16. * @var int int
  17. */
  18. private $bufferSize;
  19. /**
  20. * @var int
  21. */
  22. private $currentBufferSize;
  23. /**
  24. * @param \Stomp\Network\Connection|string $broker
  25. * @param int $bufferSize
  26. */
  27. public function __construct($broker, $bufferSize = 1000)
  28. {
  29. parent::__construct($broker);
  30. $this->bufferSize = $bufferSize;
  31. $this->buffer = [];
  32. $this->currentBufferSize = 0;
  33. }
  34. /**
  35. * @return int
  36. */
  37. public function getBufferSize()
  38. {
  39. return $this->bufferSize;
  40. }
  41. /**
  42. * Timeout is in milliseconds.
  43. */
  44. public function readMessageFrame(string $subscriptionId, int $timeout): ?Frame
  45. {
  46. // pop up frame from the buffer
  47. if (isset($this->buffer[$subscriptionId]) && ($frame = array_shift($this->buffer[$subscriptionId]))) {
  48. --$this->currentBufferSize;
  49. return $frame;
  50. }
  51. // do nothing when buffer is full
  52. if ($this->currentBufferSize >= $this->bufferSize) {
  53. return null;
  54. }
  55. $startTime = microtime(true);
  56. $remainingTimeout = $timeout * 1000;
  57. while (true) {
  58. $this->getConnection()->setReadTimeout(0, $remainingTimeout);
  59. // there is nothing to read
  60. if (false === $frame = $this->readFrame()) {
  61. return null;
  62. }
  63. if ('MESSAGE' !== $frame->getCommand()) {
  64. throw new \LogicException(sprintf('Unexpected frame was received: "%s"', $frame->getCommand()));
  65. }
  66. $headers = $frame->getHeaders();
  67. if (false == isset($headers['subscription'])) {
  68. throw new \LogicException('Got message frame with missing subscription header');
  69. }
  70. // frame belongs to another subscription
  71. if ($headers['subscription'] !== $subscriptionId) {
  72. $this->buffer[$headers['subscription']][] = $frame;
  73. ++$this->currentBufferSize;
  74. $remainingTimeout -= (microtime(true) - $startTime) * 1000000;
  75. if ($remainingTimeout <= 0) {
  76. return null;
  77. }
  78. continue;
  79. }
  80. return $frame;
  81. }
  82. }
  83. /**
  84. * {@inheritdoc}
  85. */
  86. public function disconnect($sync = false)
  87. {
  88. parent::disconnect($sync);
  89. $this->buffer = [];
  90. $this->currentBufferSize = 0;
  91. }
  92. }