RoundRobinTransport.php 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Notifier\Transport;
  11. use Symfony\Component\Notifier\Exception\LogicException;
  12. use Symfony\Component\Notifier\Exception\RuntimeException;
  13. use Symfony\Component\Notifier\Exception\TransportExceptionInterface;
  14. use Symfony\Component\Notifier\Message\MessageInterface;
  15. /**
  16. * Uses several Transports using a round robin algorithm.
  17. *
  18. * @author Fabien Potencier <fabien@symfony.com>
  19. *
  20. * @experimental in 5.1
  21. */
  22. class RoundRobinTransport implements TransportInterface
  23. {
  24. private $deadTransports;
  25. private $transports = [];
  26. private $retryPeriod;
  27. private $cursor = 0;
  28. /**
  29. * @param TransportInterface[] $transports
  30. */
  31. public function __construct(array $transports, int $retryPeriod = 60)
  32. {
  33. if (!$transports) {
  34. throw new LogicException(sprintf('"%s" must have at least one transport configured.', static::class));
  35. }
  36. $this->transports = $transports;
  37. $this->deadTransports = new \SplObjectStorage();
  38. $this->retryPeriod = $retryPeriod;
  39. // the cursor initial value is randomized so that
  40. // when are not in a daemon, we are still rotating the transports
  41. $this->cursor = mt_rand(0, \count($transports) - 1);
  42. }
  43. public function __toString(): string
  44. {
  45. return implode(' '.$this->getNameSymbol().' ', array_map('strval', $this->transports));
  46. }
  47. public function supports(MessageInterface $message): bool
  48. {
  49. foreach ($this->transports as $transport) {
  50. if ($transport->supports($message)) {
  51. return true;
  52. }
  53. }
  54. return false;
  55. }
  56. public function send(MessageInterface $message): void
  57. {
  58. while ($transport = $this->getNextTransport($message)) {
  59. try {
  60. $transport->send($message);
  61. return;
  62. } catch (TransportExceptionInterface $e) {
  63. $this->deadTransports[$transport] = microtime(true);
  64. }
  65. }
  66. throw new RuntimeException('All transports failed.');
  67. }
  68. /**
  69. * Rotates the transport list around and returns the first instance.
  70. */
  71. protected function getNextTransport(MessageInterface $message): ?TransportInterface
  72. {
  73. $cursor = $this->cursor;
  74. while (true) {
  75. $transport = $this->transports[$cursor];
  76. if (!$transport->supports($message)) {
  77. $cursor = $this->moveCursor($cursor);
  78. continue;
  79. }
  80. if (!$this->isTransportDead($transport)) {
  81. break;
  82. }
  83. if ((microtime(true) - $this->deadTransports[$transport]) > $this->retryPeriod) {
  84. $this->deadTransports->detach($transport);
  85. break;
  86. }
  87. if ($this->cursor === $cursor = $this->moveCursor($cursor)) {
  88. return null;
  89. }
  90. }
  91. $this->cursor = $this->moveCursor($cursor);
  92. return $transport;
  93. }
  94. protected function isTransportDead(TransportInterface $transport): bool
  95. {
  96. return $this->deadTransports->contains($transport);
  97. }
  98. protected function getNameSymbol(): string
  99. {
  100. return '&&';
  101. }
  102. private function moveCursor(int $cursor): int
  103. {
  104. return ++$cursor >= \count($this->transports) ? 0 : $cursor;
  105. }
  106. }