StreamIntegrationTest.php 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. <?php
  2. namespace React\Tests\Stream;
  3. use React\Stream\Stream;
  4. use React\EventLoop as rel;
  5. class StreamIntegrationTest extends TestCase
  6. {
  7. public function loopProvider()
  8. {
  9. return array(
  10. array(function() { return true; }, function() { return new rel\StreamSelectLoop; }),
  11. array(function() { return function_exists('event_base_new'); }, function() { return new rel\LibEventLoop; }),
  12. array(function() { return class_exists('libev\EventLoop'); }, function() { return new rel\LibEvLoop; }),
  13. array(function() { return class_exists('EventBase'); }, function() { return new rel\ExtEventLoop; })
  14. );
  15. }
  16. /**
  17. * @dataProvider loopProvider
  18. */
  19. public function testBufferReadsLargeChunks($condition, $loopFactory)
  20. {
  21. if (true !== $condition()) {
  22. return $this->markTestSkipped('Loop implementation not available');
  23. }
  24. $loop = $loopFactory();
  25. list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
  26. $streamA = new Stream($sockA, $loop);
  27. $streamB = new Stream($sockB, $loop);
  28. $bufferSize = 4096;
  29. $streamA->bufferSize = $bufferSize;
  30. $streamB->bufferSize = $bufferSize;
  31. $testString = str_repeat("*", $streamA->bufferSize + 1);
  32. $buffer = "";
  33. $streamB->on('data', function ($data, $streamB) use (&$buffer, &$testString) {
  34. $buffer .= $data;
  35. });
  36. $streamA->write($testString);
  37. $loop->tick();
  38. $loop->tick();
  39. $loop->tick();
  40. $streamA->close();
  41. $streamB->close();
  42. $this->assertEquals($testString, $buffer);
  43. }
  44. /**
  45. * @dataProvider loopProvider
  46. */
  47. public function testWriteLargeChunk($condition, $loopFactory)
  48. {
  49. if (true !== $condition()) {
  50. return $this->markTestSkipped('Loop implementation not available');
  51. }
  52. $loop = $loopFactory();
  53. list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
  54. $streamA = new Stream($sockA, $loop);
  55. $streamB = new Stream($sockB, $loop);
  56. // limit seems to be 192 KiB
  57. $size = 256 * 1024;
  58. // sending side sends and expects clean close with no errors
  59. $streamA->end(str_repeat('*', $size));
  60. $streamA->on('close', $this->expectCallableOnce());
  61. $streamA->on('error', $this->expectCallableNever());
  62. // receiving side counts bytes and expects clean close with no errors
  63. $received = 0;
  64. $streamB->on('data', function ($chunk) use (&$received) {
  65. $received += strlen($chunk);
  66. });
  67. $streamB->on('close', $this->expectCallableOnce());
  68. $streamB->on('error', $this->expectCallableNever());
  69. $loop->run();
  70. $streamA->close();
  71. $streamB->close();
  72. $this->assertEquals($size, $received);
  73. }
  74. /**
  75. * @dataProvider loopProvider
  76. */
  77. public function testDoesNotEmitDataIfNothingHasBeenWritten($condition, $loopFactory)
  78. {
  79. if (true !== $condition()) {
  80. return $this->markTestSkipped('Loop implementation not available');
  81. }
  82. $loop = $loopFactory();
  83. list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
  84. $streamA = new Stream($sockA, $loop);
  85. $streamB = new Stream($sockB, $loop);
  86. // end streamA without writing any data
  87. $streamA->end();
  88. // streamB should not emit any data
  89. $streamB->on('data', $this->expectCallableNever());
  90. $loop->run();
  91. $streamA->close();
  92. $streamB->close();
  93. }
  94. /**
  95. * @dataProvider loopProvider
  96. */
  97. public function testDoesNotWriteDataIfRemoteSideFromPairHasBeenClosed($condition, $loopFactory)
  98. {
  99. if (true !== $condition()) {
  100. return $this->markTestSkipped('Loop implementation not available');
  101. }
  102. $loop = $loopFactory();
  103. list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
  104. $streamA = new Stream($sockA, $loop);
  105. $streamB = new Stream($sockB, $loop);
  106. // end streamA without writing any data
  107. $streamA->pause();
  108. $streamA->write('hello');
  109. $streamA->on('close', $this->expectCallableOnce());
  110. $streamB->on('data', $this->expectCallableNever());
  111. $streamB->close();
  112. $loop->run();
  113. $streamA->close();
  114. $streamB->close();
  115. }
  116. /**
  117. * @dataProvider loopProvider
  118. */
  119. public function testDoesNotWriteDataIfServerSideHasBeenClosed($condition, $loopFactory)
  120. {
  121. if (true !== $condition()) {
  122. return $this->markTestSkipped('Loop implementation not available');
  123. }
  124. $loop = $loopFactory();
  125. $server = stream_socket_server('tcp://localhost:0');
  126. $client = stream_socket_client(stream_socket_get_name($server, false));
  127. $peer = stream_socket_accept($server);
  128. $streamA = new Stream($client, $loop);
  129. $streamB = new Stream($peer, $loop);
  130. // end streamA without writing any data
  131. $streamA->pause();
  132. $streamA->write('hello');
  133. $streamA->on('close', $this->expectCallableOnce());
  134. $streamB->on('data', $this->expectCallableNever());
  135. $streamB->close();
  136. $loop->run();
  137. $streamA->close();
  138. $streamB->close();
  139. }
  140. /**
  141. * @dataProvider loopProvider
  142. */
  143. public function testDoesNotWriteDataIfClientSideHasBeenClosed($condition, $loopFactory)
  144. {
  145. if (true !== $condition()) {
  146. return $this->markTestSkipped('Loop implementation not available');
  147. }
  148. $loop = $loopFactory();
  149. $server = stream_socket_server('tcp://localhost:0');
  150. $client = stream_socket_client(stream_socket_get_name($server, false));
  151. $peer = stream_socket_accept($server);
  152. $streamA = new Stream($peer, $loop);
  153. $streamB = new Stream($client, $loop);
  154. // end streamA without writing any data
  155. $streamA->pause();
  156. $streamA->write('hello');
  157. $streamA->on('close', $this->expectCallableOnce());
  158. $streamB->on('data', $this->expectCallableNever());
  159. $streamB->close();
  160. $loop->run();
  161. $streamA->close();
  162. $streamB->close();
  163. }
  164. }