123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- <?php
- namespace React\Tests\Stream;
- use React\Stream\Stream;
- use Clue\StreamFilter as Filter;
- class StreamTest extends TestCase
- {
- /**
- * @covers React\Stream\Stream::__construct
- */
- public function testConstructor()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createLoopMock();
- $conn = new Stream($stream, $loop);
- }
- /**
- * @covers React\Stream\Stream::__construct
- */
- public function testConstructorThrowsExceptionOnInvalidStream()
- {
- $this->setExpectedException('InvalidArgumentException');
- $loop = $this->createLoopMock();
- $conn = new Stream('breakme', $loop);
- }
- /**
- * @covers React\Stream\Stream::__construct
- */
- public function testConstructorAcceptsBuffer()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createLoopMock();
- $buffer = $this->getMock('React\Stream\WritableStreamInterface');
- $conn = new Stream($stream, $loop, $buffer);
- $this->assertSame($buffer, $conn->getBuffer());
- }
- /**
- * @covers React\Stream\Stream::__construct
- * @covers React\Stream\Stream::handleData
- */
- public function testDataEvent()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createLoopMock();
- $capturedData = null;
- $conn = new Stream($stream, $loop);
- $conn->on('data', function ($data) use (&$capturedData) {
- $capturedData = $data;
- });
- fwrite($stream, "foobar\n");
- rewind($stream);
- $conn->handleData($stream);
- $this->assertSame("foobar\n", $capturedData);
- }
- /**
- * @covers React\Stream\Stream::__construct
- * @covers React\Stream\Stream::handleData
- */
- public function testDataEventDoesEmitOneChunkMatchingBufferSize()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createLoopMock();
- $capturedData = null;
- $conn = new Stream($stream, $loop);
- $conn->on('data', function ($data) use (&$capturedData) {
- $capturedData = $data;
- });
- fwrite($stream, str_repeat("a", 100000));
- rewind($stream);
- $conn->handleData($stream);
- $this->assertTrue($conn->isReadable());
- $this->assertEquals($conn->bufferSize, strlen($capturedData));
- }
- /**
- * @covers React\Stream\Stream::__construct
- * @covers React\Stream\Stream::handleData
- */
- public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfinite()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createLoopMock();
- $capturedData = null;
- $conn = new Stream($stream, $loop);
- $conn->bufferSize = null;
- $conn->on('data', function ($data) use (&$capturedData) {
- $capturedData = $data;
- });
- fwrite($stream, str_repeat("a", 100000));
- rewind($stream);
- $conn->handleData($stream);
- $this->assertFalse($conn->isReadable());
- $this->assertEquals(100000, strlen($capturedData));
- }
- /**
- * @covers React\Stream\Stream::handleData
- */
- public function testEmptyStreamShouldNotEmitData()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createLoopMock();
- $conn = new Stream($stream, $loop);
- $conn->on('data', $this->expectCallableNever());
- $conn->handleData($stream);
- }
- /**
- * @covers React\Stream\Stream::write
- */
- public function testWrite()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createWriteableLoopMock();
- $conn = new Stream($stream, $loop);
- $conn->write("foo\n");
- rewind($stream);
- $this->assertSame("foo\n", fgets($stream));
- }
- /**
- * @covers React\Stream\Stream::end
- */
- public function testEnd()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createLoopMock();
- $conn = new Stream($stream, $loop);
- $conn->end();
- $this->assertFalse(is_resource($stream));
- }
- public function testBufferEventsShouldBubbleUp()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createLoopMock();
- $conn = new Stream($stream, $loop);
- $conn->on('drain', $this->expectCallableOnce());
- $conn->on('error', $this->expectCallableOnce());
- $buffer = $conn->getBuffer();
- $buffer->emit('drain');
- $buffer->emit('error', array(new \RuntimeException('Whoops')));
- }
- /**
- * @covers React\Stream\Stream::handleData
- */
- public function testClosingStreamInDataEventShouldNotTriggerError()
- {
- $stream = fopen('php://temp', 'r+');
- $loop = $this->createLoopMock();
- $conn = new Stream($stream, $loop);
- $conn->on('data', function ($data, $stream) {
- $stream->close();
- });
- fwrite($stream, "foobar\n");
- rewind($stream);
- $conn->handleData($stream);
- }
- /**
- * @covers React\Stream\Stream::handleData
- */
- public function testDataFiltered()
- {
- $stream = fopen('php://temp', 'r+');
- // add a filter which removes every 'a' when reading
- Filter\append($stream, function ($chunk) {
- return str_replace('a', '', $chunk);
- }, STREAM_FILTER_READ);
- $loop = $this->createLoopMock();
- $capturedData = null;
- $conn = new Stream($stream, $loop);
- $conn->on('data', function ($data) use (&$capturedData) {
- $capturedData = $data;
- });
- fwrite($stream, "foobar\n");
- rewind($stream);
- $conn->handleData($stream);
- $this->assertSame("foobr\n", $capturedData);
- }
- /**
- * @covers React\Stream\Stream::handleData
- */
- public function testDataErrorShouldEmitErrorAndClose()
- {
- $stream = fopen('php://temp', 'r+');
- // add a filter which returns an error when encountering an 'a' when reading
- Filter\append($stream, function ($chunk) {
- if (strpos($chunk, 'a') !== false) {
- throw new \Exception('Invalid');
- }
- return $chunk;
- }, STREAM_FILTER_READ);
- $loop = $this->createLoopMock();
- $conn = new Stream($stream, $loop);
- $conn->on('data', $this->expectCallableNever());
- $conn->on('error', $this->expectCallableOnce());
- $conn->on('close', $this->expectCallableOnce());
- fwrite($stream, "foobar\n");
- rewind($stream);
- $conn->handleData($stream);
- }
- private function createWriteableLoopMock()
- {
- $loop = $this->createLoopMock();
- $loop
- ->expects($this->once())
- ->method('addWriteStream')
- ->will($this->returnCallback(function ($stream, $listener) {
- call_user_func($listener, $stream);
- }));
- return $loop;
- }
- private function createLoopMock()
- {
- return $this->getMock('React\EventLoop\LoopInterface');
- }
- }
|