123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- <?php
- declare(strict_types=1);
- namespace Enqueue\Stomp;
- use Interop\Queue\Consumer;
- use Interop\Queue\Exception\InvalidMessageException;
- use Interop\Queue\Message;
- use Interop\Queue\Queue;
- use Stomp\Client;
- use Stomp\Transport\Frame;
- class StompConsumer implements Consumer
- {
- const ACK_AUTO = 'auto';
- const ACK_CLIENT = 'client';
- const ACK_CLIENT_INDIVIDUAL = 'client-individual';
-
- private $queue;
-
- private $stomp;
-
- private $isSubscribed;
-
- private $ackMode;
-
- private $prefetchCount;
-
- private $subscriptionId;
- public function __construct(BufferedStompClient $stomp, StompDestination $queue)
- {
- $this->stomp = $stomp;
- $this->queue = $queue;
- $this->isSubscribed = false;
- $this->ackMode = self::ACK_CLIENT_INDIVIDUAL;
- $this->prefetchCount = 1;
- $this->subscriptionId = StompDestination::TYPE_TEMP_QUEUE == $queue->getType() ?
- $queue->getQueueName() :
- uniqid('', true)
- ;
- }
- public function setAckMode(string $mode): void
- {
- if (false === in_array($mode, [self::ACK_AUTO, self::ACK_CLIENT, self::ACK_CLIENT_INDIVIDUAL], true)) {
- throw new \LogicException(sprintf('Ack mode is not valid: "%s"', $mode));
- }
- $this->ackMode = $mode;
- }
- public function getAckMode(): string
- {
- return $this->ackMode;
- }
- public function getPrefetchCount(): int
- {
- return $this->prefetchCount;
- }
- public function setPrefetchCount(int $prefetchCount): void
- {
- $this->prefetchCount = $prefetchCount;
- }
-
- public function getQueue(): Queue
- {
- return $this->queue;
- }
- public function receive(int $timeout = 0): ?Message
- {
- $this->subscribe();
- if (0 === $timeout) {
- while (true) {
- if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
- return $this->convertMessage($message);
- }
- }
- } else {
- if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
- return $this->convertMessage($message);
- }
- }
- return null;
- }
- public function receiveNoWait(): ?Message
- {
- $this->subscribe();
- if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 0)) {
- return $this->convertMessage($message);
- }
- return null;
- }
-
- public function acknowledge(Message $message): void
- {
- InvalidMessageException::assertMessageInstanceOf($message, StompMessage::class);
- $this->stomp->sendFrame(
- $this->stomp->getProtocol()->getAckFrame($message->getFrame())
- );
- }
-
- public function reject(Message $message, bool $requeue = false): void
- {
- InvalidMessageException::assertMessageInstanceOf($message, StompMessage::class);
- $nackFrame = $this->stomp->getProtocol()->getNackFrame($message->getFrame());
-
- $nackFrame->addHeaders([
- 'requeue' => $requeue ? 'true' : 'false',
- ]);
- $this->stomp->sendFrame($nackFrame);
- }
- private function subscribe(): void
- {
- if (StompDestination::TYPE_TEMP_QUEUE == $this->queue->getType()) {
- $this->isSubscribed = true;
- return;
- }
- if (false == $this->isSubscribed) {
- $this->isSubscribed = true;
- $frame = $this->stomp->getProtocol()->getSubscribeFrame(
- $this->queue->getQueueName(),
- $this->subscriptionId,
- $this->ackMode
- );
-
- $headers = $this->queue->getHeaders();
- $headers['prefetch-count'] = $this->prefetchCount;
- $headers = StompHeadersEncoder::encode($headers);
- foreach ($headers as $key => $value) {
- $frame[$key] = $value;
- }
- $this->stomp->sendFrame($frame);
- }
- }
- private function convertMessage(Frame $frame): StompMessage
- {
- if ('MESSAGE' !== $frame->getCommand()) {
- throw new \LogicException(sprintf('Frame is not MESSAGE frame but: "%s"', $frame->getCommand()));
- }
- list($headers, $properties) = StompHeadersEncoder::decode($frame->getHeaders());
- $redelivered = isset($headers['redelivered']) && 'true' === $headers['redelivered'];
- unset(
- $headers['redelivered'],
- $headers['destination'],
- $headers['message-id'],
- $headers['ack'],
- $headers['receipt'],
- $headers['subscription'],
- $headers['content-length']
- );
- $message = new StompMessage((string) $frame->getBody(), $properties, $headers);
- $message->setRedelivered($redelivered);
- $message->setFrame($frame);
- return $message;
- }
- }
|