StreamTest.php 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. <?php
  2. namespace React\Tests\Stream;
  3. use React\Stream\Stream;
  4. use Clue\StreamFilter as Filter;
  5. class StreamTest extends TestCase
  6. {
  7. /**
  8. * @covers React\Stream\Stream::__construct
  9. */
  10. public function testConstructor()
  11. {
  12. $stream = fopen('php://temp', 'r+');
  13. $loop = $this->createLoopMock();
  14. $conn = new Stream($stream, $loop);
  15. }
  16. /**
  17. * @covers React\Stream\Stream::__construct
  18. */
  19. public function testConstructorThrowsExceptionOnInvalidStream()
  20. {
  21. $this->setExpectedException('InvalidArgumentException');
  22. $loop = $this->createLoopMock();
  23. $conn = new Stream('breakme', $loop);
  24. }
  25. /**
  26. * @covers React\Stream\Stream::__construct
  27. */
  28. public function testConstructorAcceptsBuffer()
  29. {
  30. $stream = fopen('php://temp', 'r+');
  31. $loop = $this->createLoopMock();
  32. $buffer = $this->getMock('React\Stream\WritableStreamInterface');
  33. $conn = new Stream($stream, $loop, $buffer);
  34. $this->assertSame($buffer, $conn->getBuffer());
  35. }
  36. /**
  37. * @covers React\Stream\Stream::__construct
  38. * @covers React\Stream\Stream::handleData
  39. */
  40. public function testDataEvent()
  41. {
  42. $stream = fopen('php://temp', 'r+');
  43. $loop = $this->createLoopMock();
  44. $capturedData = null;
  45. $conn = new Stream($stream, $loop);
  46. $conn->on('data', function ($data) use (&$capturedData) {
  47. $capturedData = $data;
  48. });
  49. fwrite($stream, "foobar\n");
  50. rewind($stream);
  51. $conn->handleData($stream);
  52. $this->assertSame("foobar\n", $capturedData);
  53. }
  54. /**
  55. * @covers React\Stream\Stream::__construct
  56. * @covers React\Stream\Stream::handleData
  57. */
  58. public function testDataEventDoesEmitOneChunkMatchingBufferSize()
  59. {
  60. $stream = fopen('php://temp', 'r+');
  61. $loop = $this->createLoopMock();
  62. $capturedData = null;
  63. $conn = new Stream($stream, $loop);
  64. $conn->on('data', function ($data) use (&$capturedData) {
  65. $capturedData = $data;
  66. });
  67. fwrite($stream, str_repeat("a", 100000));
  68. rewind($stream);
  69. $conn->handleData($stream);
  70. $this->assertTrue($conn->isReadable());
  71. $this->assertEquals($conn->bufferSize, strlen($capturedData));
  72. }
  73. /**
  74. * @covers React\Stream\Stream::__construct
  75. * @covers React\Stream\Stream::handleData
  76. */
  77. public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfinite()
  78. {
  79. $stream = fopen('php://temp', 'r+');
  80. $loop = $this->createLoopMock();
  81. $capturedData = null;
  82. $conn = new Stream($stream, $loop);
  83. $conn->bufferSize = null;
  84. $conn->on('data', function ($data) use (&$capturedData) {
  85. $capturedData = $data;
  86. });
  87. fwrite($stream, str_repeat("a", 100000));
  88. rewind($stream);
  89. $conn->handleData($stream);
  90. $this->assertFalse($conn->isReadable());
  91. $this->assertEquals(100000, strlen($capturedData));
  92. }
  93. /**
  94. * @covers React\Stream\Stream::handleData
  95. */
  96. public function testEmptyStreamShouldNotEmitData()
  97. {
  98. $stream = fopen('php://temp', 'r+');
  99. $loop = $this->createLoopMock();
  100. $conn = new Stream($stream, $loop);
  101. $conn->on('data', $this->expectCallableNever());
  102. $conn->handleData($stream);
  103. }
  104. /**
  105. * @covers React\Stream\Stream::write
  106. */
  107. public function testWrite()
  108. {
  109. $stream = fopen('php://temp', 'r+');
  110. $loop = $this->createWriteableLoopMock();
  111. $conn = new Stream($stream, $loop);
  112. $conn->write("foo\n");
  113. rewind($stream);
  114. $this->assertSame("foo\n", fgets($stream));
  115. }
  116. /**
  117. * @covers React\Stream\Stream::end
  118. */
  119. public function testEnd()
  120. {
  121. $stream = fopen('php://temp', 'r+');
  122. $loop = $this->createLoopMock();
  123. $conn = new Stream($stream, $loop);
  124. $conn->end();
  125. $this->assertFalse(is_resource($stream));
  126. }
  127. public function testBufferEventsShouldBubbleUp()
  128. {
  129. $stream = fopen('php://temp', 'r+');
  130. $loop = $this->createLoopMock();
  131. $conn = new Stream($stream, $loop);
  132. $conn->on('drain', $this->expectCallableOnce());
  133. $conn->on('error', $this->expectCallableOnce());
  134. $buffer = $conn->getBuffer();
  135. $buffer->emit('drain');
  136. $buffer->emit('error', array(new \RuntimeException('Whoops')));
  137. }
  138. /**
  139. * @covers React\Stream\Stream::handleData
  140. */
  141. public function testClosingStreamInDataEventShouldNotTriggerError()
  142. {
  143. $stream = fopen('php://temp', 'r+');
  144. $loop = $this->createLoopMock();
  145. $conn = new Stream($stream, $loop);
  146. $conn->on('data', function ($data, $stream) {
  147. $stream->close();
  148. });
  149. fwrite($stream, "foobar\n");
  150. rewind($stream);
  151. $conn->handleData($stream);
  152. }
  153. /**
  154. * @covers React\Stream\Stream::handleData
  155. */
  156. public function testDataFiltered()
  157. {
  158. $stream = fopen('php://temp', 'r+');
  159. // add a filter which removes every 'a' when reading
  160. Filter\append($stream, function ($chunk) {
  161. return str_replace('a', '', $chunk);
  162. }, STREAM_FILTER_READ);
  163. $loop = $this->createLoopMock();
  164. $capturedData = null;
  165. $conn = new Stream($stream, $loop);
  166. $conn->on('data', function ($data) use (&$capturedData) {
  167. $capturedData = $data;
  168. });
  169. fwrite($stream, "foobar\n");
  170. rewind($stream);
  171. $conn->handleData($stream);
  172. $this->assertSame("foobr\n", $capturedData);
  173. }
  174. /**
  175. * @covers React\Stream\Stream::handleData
  176. */
  177. public function testDataErrorShouldEmitErrorAndClose()
  178. {
  179. $stream = fopen('php://temp', 'r+');
  180. // add a filter which returns an error when encountering an 'a' when reading
  181. Filter\append($stream, function ($chunk) {
  182. if (strpos($chunk, 'a') !== false) {
  183. throw new \Exception('Invalid');
  184. }
  185. return $chunk;
  186. }, STREAM_FILTER_READ);
  187. $loop = $this->createLoopMock();
  188. $conn = new Stream($stream, $loop);
  189. $conn->on('data', $this->expectCallableNever());
  190. $conn->on('error', $this->expectCallableOnce());
  191. $conn->on('close', $this->expectCallableOnce());
  192. fwrite($stream, "foobar\n");
  193. rewind($stream);
  194. $conn->handleData($stream);
  195. }
  196. private function createWriteableLoopMock()
  197. {
  198. $loop = $this->createLoopMock();
  199. $loop
  200. ->expects($this->once())
  201. ->method('addWriteStream')
  202. ->will($this->returnCallback(function ($stream, $listener) {
  203. call_user_func($listener, $stream);
  204. }));
  205. return $loop;
  206. }
  207. private function createLoopMock()
  208. {
  209. return $this->getMock('React\EventLoop\LoopInterface');
  210. }
  211. }