123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- <?php
- namespace React\Tests\Stream;
- use React\Stream\Stream;
- use React\EventLoop as rel;
- class StreamIntegrationTest extends TestCase
- {
- public function loopProvider()
- {
- return array(
- array(function() { return true; }, function() { return new rel\StreamSelectLoop; }),
- array(function() { return function_exists('event_base_new'); }, function() { return new rel\LibEventLoop; }),
- array(function() { return class_exists('libev\EventLoop'); }, function() { return new rel\LibEvLoop; }),
- array(function() { return class_exists('EventBase'); }, function() { return new rel\ExtEventLoop; })
- );
- }
- /**
- * @dataProvider loopProvider
- */
- public function testBufferReadsLargeChunks($condition, $loopFactory)
- {
- if (true !== $condition()) {
- return $this->markTestSkipped('Loop implementation not available');
- }
- $loop = $loopFactory();
- list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
- $streamA = new Stream($sockA, $loop);
- $streamB = new Stream($sockB, $loop);
- $bufferSize = 4096;
- $streamA->bufferSize = $bufferSize;
- $streamB->bufferSize = $bufferSize;
- $testString = str_repeat("*", $streamA->bufferSize + 1);
- $buffer = "";
- $streamB->on('data', function ($data, $streamB) use (&$buffer, &$testString) {
- $buffer .= $data;
- });
- $streamA->write($testString);
- $loop->tick();
- $loop->tick();
- $loop->tick();
- $streamA->close();
- $streamB->close();
- $this->assertEquals($testString, $buffer);
- }
- /**
- * @dataProvider loopProvider
- */
- public function testWriteLargeChunk($condition, $loopFactory)
- {
- if (true !== $condition()) {
- return $this->markTestSkipped('Loop implementation not available');
- }
- $loop = $loopFactory();
- list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
- $streamA = new Stream($sockA, $loop);
- $streamB = new Stream($sockB, $loop);
- // limit seems to be 192 KiB
- $size = 256 * 1024;
- // sending side sends and expects clean close with no errors
- $streamA->end(str_repeat('*', $size));
- $streamA->on('close', $this->expectCallableOnce());
- $streamA->on('error', $this->expectCallableNever());
- // receiving side counts bytes and expects clean close with no errors
- $received = 0;
- $streamB->on('data', function ($chunk) use (&$received) {
- $received += strlen($chunk);
- });
- $streamB->on('close', $this->expectCallableOnce());
- $streamB->on('error', $this->expectCallableNever());
- $loop->run();
- $streamA->close();
- $streamB->close();
- $this->assertEquals($size, $received);
- }
- /**
- * @dataProvider loopProvider
- */
- public function testDoesNotEmitDataIfNothingHasBeenWritten($condition, $loopFactory)
- {
- if (true !== $condition()) {
- return $this->markTestSkipped('Loop implementation not available');
- }
- $loop = $loopFactory();
- list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
- $streamA = new Stream($sockA, $loop);
- $streamB = new Stream($sockB, $loop);
- // end streamA without writing any data
- $streamA->end();
- // streamB should not emit any data
- $streamB->on('data', $this->expectCallableNever());
- $loop->run();
- $streamA->close();
- $streamB->close();
- }
- /**
- * @dataProvider loopProvider
- */
- public function testDoesNotWriteDataIfRemoteSideFromPairHasBeenClosed($condition, $loopFactory)
- {
- if (true !== $condition()) {
- return $this->markTestSkipped('Loop implementation not available');
- }
- $loop = $loopFactory();
- list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
- $streamA = new Stream($sockA, $loop);
- $streamB = new Stream($sockB, $loop);
- // end streamA without writing any data
- $streamA->pause();
- $streamA->write('hello');
- $streamA->on('close', $this->expectCallableOnce());
- $streamB->on('data', $this->expectCallableNever());
- $streamB->close();
- $loop->run();
- $streamA->close();
- $streamB->close();
- }
- /**
- * @dataProvider loopProvider
- */
- public function testDoesNotWriteDataIfServerSideHasBeenClosed($condition, $loopFactory)
- {
- if (true !== $condition()) {
- return $this->markTestSkipped('Loop implementation not available');
- }
- $loop = $loopFactory();
- $server = stream_socket_server('tcp://localhost:0');
- $client = stream_socket_client(stream_socket_get_name($server, false));
- $peer = stream_socket_accept($server);
- $streamA = new Stream($client, $loop);
- $streamB = new Stream($peer, $loop);
- // end streamA without writing any data
- $streamA->pause();
- $streamA->write('hello');
- $streamA->on('close', $this->expectCallableOnce());
- $streamB->on('data', $this->expectCallableNever());
- $streamB->close();
- $loop->run();
- $streamA->close();
- $streamB->close();
- }
- /**
- * @dataProvider loopProvider
- */
- public function testDoesNotWriteDataIfClientSideHasBeenClosed($condition, $loopFactory)
- {
- if (true !== $condition()) {
- return $this->markTestSkipped('Loop implementation not available');
- }
- $loop = $loopFactory();
- $server = stream_socket_server('tcp://localhost:0');
- $client = stream_socket_client(stream_socket_get_name($server, false));
- $peer = stream_socket_accept($server);
- $streamA = new Stream($peer, $loop);
- $streamB = new Stream($client, $loop);
- // end streamA without writing any data
- $streamA->pause();
- $streamA->write('hello');
- $streamA->on('close', $this->expectCallableOnce());
- $streamB->on('data', $this->expectCallableNever());
- $streamB->close();
- $loop->run();
- $streamA->close();
- $streamB->close();
- }
- }
|