Stream.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. <?php
  2. /**
  3. * Hoa
  4. *
  5. *
  6. * @license
  7. *
  8. * New BSD License
  9. *
  10. * Copyright © 2007-2017, Hoa community. All rights reserved.
  11. *
  12. * Redistribution and use in source and binary forms, with or without
  13. * modification, are permitted provided that the following conditions are met:
  14. * * Redistributions of source code must retain the above copyright
  15. * notice, this list of conditions and the following disclaimer.
  16. * * Redistributions in binary form must reproduce the above copyright
  17. * notice, this list of conditions and the following disclaimer in the
  18. * documentation and/or other materials provided with the distribution.
  19. * * Neither the name of the Hoa nor the names of its contributors may be
  20. * used to endorse or promote products derived from this software without
  21. * specific prior written permission.
  22. *
  23. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  24. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  25. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  26. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND CONTRIBUTORS BE
  27. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  28. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  29. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  30. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  31. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  32. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  33. * POSSIBILITY OF SUCH DAMAGE.
  34. */
  35. namespace Hoa\Stream;
  36. use Hoa\Consistency;
  37. use Hoa\Event;
  38. use Hoa\Protocol;
  39. /**
  40. * Class \Hoa\Stream.
  41. *
  42. * Static register for all streams (files, sockets etc.).
  43. *
  44. * @copyright Copyright © 2007-2017 Hoa community
  45. * @license New BSD License
  46. */
  47. abstract class Stream implements IStream\Stream, Event\Listenable
  48. {
  49. use Event\Listens;
  50. /**
  51. * Name index in the stream bucket.
  52. *
  53. * @const int
  54. */
  55. const NAME = 0;
  56. /**
  57. * Handler index in the stream bucket.
  58. *
  59. * @const int
  60. */
  61. const HANDLER = 1;
  62. /**
  63. * Resource index in the stream bucket.
  64. *
  65. * @const int
  66. */
  67. const RESOURCE = 2;
  68. /**
  69. * Context index in the stream bucket.
  70. *
  71. * @const int
  72. */
  73. const CONTEXT = 3;
  74. /**
  75. * Default buffer size.
  76. *
  77. * @const int
  78. */
  79. const DEFAULT_BUFFER_SIZE = 8192;
  80. /**
  81. * Current stream bucket.
  82. *
  83. * @var array
  84. */
  85. protected $_bucket = [];
  86. /**
  87. * Static stream register.
  88. *
  89. * @var array
  90. */
  91. private static $_register = [];
  92. /**
  93. * Buffer size (default is 8Ko).
  94. *
  95. * @var bool
  96. */
  97. protected $_bufferSize = self::DEFAULT_BUFFER_SIZE;
  98. /**
  99. * Original stream name, given to the stream constructor.
  100. *
  101. * @var string
  102. */
  103. protected $_streamName = null;
  104. /**
  105. * Context name.
  106. *
  107. * @var string
  108. */
  109. protected $_context = null;
  110. /**
  111. * Whether the opening has been deferred.
  112. *
  113. * @var bool
  114. */
  115. protected $_hasBeenDeferred = false;
  116. /**
  117. * Whether this stream is already opened by another handler.
  118. *
  119. * @var bool
  120. */
  121. protected $_borrowing = false;
  122. /**
  123. * Set the current stream.
  124. * If not exists in the register, try to call the
  125. * `$this->_open()` method. Please, see the `self::_getStream()` method.
  126. *
  127. * @param string $streamName Stream name (e.g. path or URL).
  128. * @param string $context Context ID (please, see the
  129. * `Hoa\Stream\Context` class).
  130. * @param bool $wait Differ opening or not.
  131. */
  132. public function __construct($streamName, $context = null, $wait = false)
  133. {
  134. $this->_streamName = $streamName;
  135. $this->_context = $context;
  136. $this->_hasBeenDeferred = $wait;
  137. $this->setListener(
  138. new Event\Listener(
  139. $this,
  140. [
  141. 'authrequire',
  142. 'authresult',
  143. 'complete',
  144. 'connect',
  145. 'failure',
  146. 'mimetype',
  147. 'progress',
  148. 'redirect',
  149. 'resolve',
  150. 'size'
  151. ]
  152. )
  153. );
  154. if (true === $wait) {
  155. return;
  156. }
  157. $this->open();
  158. return;
  159. }
  160. /**
  161. * Get a stream in the register.
  162. * If the stream does not exist, try to open it by calling the
  163. * $handler->_open() method.
  164. *
  165. * @param string $streamName Stream name.
  166. * @param \Hoa\Stream $handler Stream handler.
  167. * @param string $context Context ID (please, see the
  168. * \Hoa\Stream\Context class).
  169. * @return array
  170. * @throws \Hoa\Stream\Exception
  171. */
  172. final private static function &_getStream(
  173. $streamName,
  174. Stream $handler,
  175. $context = null
  176. ) {
  177. $name = md5($streamName);
  178. if (null !== $context) {
  179. if (false === Context::contextExists($context)) {
  180. throw new Exception(
  181. 'Context %s was not previously declared, cannot retrieve ' .
  182. 'this context.',
  183. 0,
  184. $context
  185. );
  186. }
  187. $context = Context::getInstance($context);
  188. }
  189. if (!isset(self::$_register[$name])) {
  190. self::$_register[$name] = [
  191. self::NAME => $streamName,
  192. self::HANDLER => $handler,
  193. self::RESOURCE => $handler->_open($streamName, $context),
  194. self::CONTEXT => $context
  195. ];
  196. Event::register(
  197. 'hoa://Event/Stream/' . $streamName,
  198. $handler
  199. );
  200. // Add :open-ready?
  201. Event::register(
  202. 'hoa://Event/Stream/' . $streamName . ':close-before',
  203. $handler
  204. );
  205. } else {
  206. $handler->_borrowing = true;
  207. }
  208. if (null === self::$_register[$name][self::RESOURCE]) {
  209. self::$_register[$name][self::RESOURCE]
  210. = $handler->_open($streamName, $context);
  211. }
  212. return self::$_register[$name];
  213. }
  214. /**
  215. * Open the stream and return the associated resource.
  216. * Note: This method is protected, but do not forget that it could be
  217. * overloaded into a public context.
  218. *
  219. * @param string $streamName Stream name (e.g. path or URL).
  220. * @param \Hoa\Stream\Context $context Context.
  221. * @return resource
  222. * @throws \Hoa\Exception\Exception
  223. */
  224. abstract protected function &_open($streamName, Context $context = null);
  225. /**
  226. * Close the current stream.
  227. * Note: this method is protected, but do not forget that it could be
  228. * overloaded into a public context.
  229. *
  230. * @return bool
  231. */
  232. abstract protected function _close();
  233. /**
  234. * Open the stream.
  235. *
  236. * @return \Hoa\Stream
  237. * @throws \Hoa\Stream\Exception
  238. */
  239. final public function open()
  240. {
  241. $context = $this->_context;
  242. if (true === $this->hasBeenDeferred()) {
  243. if (null === $context) {
  244. $handle = Context::getInstance(uniqid());
  245. $handle->setParameters([
  246. 'notification' => [$this, '_notify']
  247. ]);
  248. $context = $handle->getId();
  249. } elseif (true === Context::contextExists($context)) {
  250. $handle = Context::getInstance($context);
  251. $parameters = $handle->getParameters();
  252. if (!isset($parameters['notification'])) {
  253. $handle->setParameters([
  254. 'notification' => [$this, '_notify']
  255. ]);
  256. }
  257. }
  258. }
  259. $this->_bufferSize = self::DEFAULT_BUFFER_SIZE;
  260. $this->_bucket = self::_getStream(
  261. $this->_streamName,
  262. $this,
  263. $context
  264. );
  265. return $this;
  266. }
  267. /**
  268. * Close the current stream.
  269. *
  270. * @return void
  271. */
  272. final public function close()
  273. {
  274. $streamName = $this->getStreamName();
  275. $name = md5($streamName);
  276. if (!isset(self::$_register[$name])) {
  277. return;
  278. }
  279. Event::notify(
  280. 'hoa://Event/Stream/' . $streamName . ':close-before',
  281. $this,
  282. new Event\Bucket()
  283. );
  284. if (false === $this->_close()) {
  285. return;
  286. }
  287. unset(self::$_register[$name]);
  288. $this->_bucket[self::HANDLER] = null;
  289. Event::unregister(
  290. 'hoa://Event/Stream/' . $streamName
  291. );
  292. Event::unregister(
  293. 'hoa://Event/Stream/' . $streamName . ':close-before'
  294. );
  295. return;
  296. }
  297. /**
  298. * Get the current stream name.
  299. *
  300. * @return string
  301. */
  302. public function getStreamName()
  303. {
  304. if (empty($this->_bucket)) {
  305. return null;
  306. }
  307. return $this->_bucket[self::NAME];
  308. }
  309. /**
  310. * Get the current stream.
  311. *
  312. * @return resource
  313. */
  314. public function getStream()
  315. {
  316. if (empty($this->_bucket)) {
  317. return null;
  318. }
  319. return $this->_bucket[self::RESOURCE];
  320. }
  321. /**
  322. * Get the current stream context.
  323. *
  324. * @return \Hoa\Stream\Context
  325. */
  326. public function getStreamContext()
  327. {
  328. if (empty($this->_bucket)) {
  329. return null;
  330. }
  331. return $this->_bucket[self::CONTEXT];
  332. }
  333. /**
  334. * Get stream handler according to its name.
  335. *
  336. * @param string $streamName Stream name.
  337. * @return \Hoa\Stream
  338. */
  339. public static function getStreamHandler($streamName)
  340. {
  341. $name = md5($streamName);
  342. if (!isset(self::$_register[$name])) {
  343. return null;
  344. }
  345. return self::$_register[$name][self::HANDLER];
  346. }
  347. /**
  348. * Set the current stream. Useful to manage a stack of streams (e.g. socket
  349. * and select). Notice that it could be unsafe to use this method without
  350. * taking time to think about it two minutes. Resource of type “Unknown” is
  351. * considered as valid.
  352. *
  353. * @return resource
  354. * @throws \Hoa\Stream\Exception
  355. */
  356. public function _setStream($stream)
  357. {
  358. if (false === is_resource($stream) &&
  359. ('resource' !== gettype($stream) ||
  360. 'Unknown' !== get_resource_type($stream))) {
  361. throw new Exception(
  362. 'Try to change the stream resource with an invalid one; ' .
  363. 'given %s.',
  364. 1,
  365. gettype($stream)
  366. );
  367. }
  368. $old = $this->_bucket[self::RESOURCE];
  369. $this->_bucket[self::RESOURCE] = $stream;
  370. return $old;
  371. }
  372. /**
  373. * Check if the stream is opened.
  374. *
  375. * @return bool
  376. */
  377. public function isOpened()
  378. {
  379. return is_resource($this->getStream());
  380. }
  381. /**
  382. * Set the timeout period.
  383. *
  384. * @param int $seconds Timeout period in seconds.
  385. * @param int $microseconds Timeout period in microseconds.
  386. * @return bool
  387. */
  388. public function setStreamTimeout($seconds, $microseconds = 0)
  389. {
  390. return stream_set_timeout($this->getStream(), $seconds, $microseconds);
  391. }
  392. /**
  393. * Whether the opening of the stream has been deferred
  394. */
  395. protected function hasBeenDeferred()
  396. {
  397. return $this->_hasBeenDeferred;
  398. }
  399. /**
  400. * Check whether the connection has timed out or not.
  401. * This is basically a shortcut of `getStreamMetaData` + the `timed_out`
  402. * index, but the resulting code is more readable.
  403. *
  404. * @return bool
  405. */
  406. public function hasTimedOut()
  407. {
  408. $metaData = $this->getStreamMetaData();
  409. return true === $metaData['timed_out'];
  410. }
  411. /**
  412. * Set blocking/non-blocking mode.
  413. *
  414. * @param bool $mode Blocking mode.
  415. * @return bool
  416. */
  417. public function setStreamBlocking($mode)
  418. {
  419. return stream_set_blocking($this->getStream(), (int) $mode);
  420. }
  421. /**
  422. * Set stream buffer.
  423. * Output using fwrite() (or similar function) is normally buffered at 8 Ko.
  424. * This means that if there are two processes wanting to write to the same
  425. * output stream, each is paused after 8 Ko of data to allow the other to
  426. * write.
  427. *
  428. * @param int $buffer Number of bytes to buffer. If zero, write
  429. * operations are unbuffered. This ensures that
  430. * all writes are completed before other
  431. * processes are allowed to write to that output
  432. * stream.
  433. * @return bool
  434. */
  435. public function setStreamBuffer($buffer)
  436. {
  437. // Zero means success.
  438. $out = 0 === stream_set_write_buffer($this->getStream(), $buffer);
  439. if (true === $out) {
  440. $this->_bufferSize = $buffer;
  441. }
  442. return $out;
  443. }
  444. /**
  445. * Disable stream buffering.
  446. * Alias of $this->setBuffer(0).
  447. *
  448. * @return bool
  449. */
  450. public function disableStreamBuffer()
  451. {
  452. return $this->setStreamBuffer(0);
  453. }
  454. /**
  455. * Get stream buffer size.
  456. *
  457. * @return int
  458. */
  459. public function getStreamBufferSize()
  460. {
  461. return $this->_bufferSize;
  462. }
  463. /**
  464. * Get stream wrapper name.
  465. *
  466. * @return string
  467. */
  468. public function getStreamWrapperName()
  469. {
  470. if (false === $pos = strpos($this->getStreamName(), '://')) {
  471. return 'file';
  472. }
  473. return substr($this->getStreamName(), 0, $pos);
  474. }
  475. /**
  476. * Get stream meta data.
  477. *
  478. * @return array
  479. */
  480. public function getStreamMetaData()
  481. {
  482. return stream_get_meta_data($this->getStream());
  483. }
  484. /**
  485. * Whether this stream is already opened by another handler.
  486. *
  487. * @return bool
  488. */
  489. public function isBorrowing()
  490. {
  491. return $this->_borrowing;
  492. }
  493. /**
  494. * Notification callback.
  495. *
  496. * @param int $ncode Notification code. Please, see
  497. * STREAM_NOTIFY_* constants.
  498. * @param int $severity Severity. Please, see
  499. * STREAM_NOTIFY_SEVERITY_* constants.
  500. * @param string $message Message.
  501. * @param int $code Message code.
  502. * @param int $transferred If applicable, the number of transferred
  503. * bytes.
  504. * @param int $max If applicable, the number of max bytes.
  505. * @return void
  506. */
  507. public function _notify(
  508. $ncode,
  509. $severity,
  510. $message,
  511. $code,
  512. $transferred,
  513. $max
  514. ) {
  515. static $_map = [
  516. STREAM_NOTIFY_AUTH_REQUIRED => 'authrequire',
  517. STREAM_NOTIFY_AUTH_RESULT => 'authresult',
  518. STREAM_NOTIFY_COMPLETED => 'complete',
  519. STREAM_NOTIFY_CONNECT => 'connect',
  520. STREAM_NOTIFY_FAILURE => 'failure',
  521. STREAM_NOTIFY_MIME_TYPE_IS => 'mimetype',
  522. STREAM_NOTIFY_PROGRESS => 'progress',
  523. STREAM_NOTIFY_REDIRECTED => 'redirect',
  524. STREAM_NOTIFY_RESOLVE => 'resolve',
  525. STREAM_NOTIFY_FILE_SIZE_IS => 'size'
  526. ];
  527. $this->getListener()->fire($_map[$ncode], new Event\Bucket([
  528. 'code' => $code,
  529. 'severity' => $severity,
  530. 'message' => $message,
  531. 'transferred' => $transferred,
  532. 'max' => $max
  533. ]));
  534. return;
  535. }
  536. /**
  537. * Call the $handler->close() method on each stream in the static stream
  538. * register.
  539. * This method does not check the return value of $handler->close(). Thus,
  540. * if a stream is persistent, the $handler->close() should do anything. It
  541. * is a very generic method.
  542. *
  543. * @return void
  544. */
  545. final public static function _Hoa_Stream()
  546. {
  547. foreach (self::$_register as $entry) {
  548. $entry[self::HANDLER]->close();
  549. }
  550. return;
  551. }
  552. /**
  553. * Transform object to string.
  554. *
  555. * @return string
  556. */
  557. public function __toString()
  558. {
  559. return $this->getStreamName();
  560. }
  561. /**
  562. * Close the stream when destructing.
  563. *
  564. * @return void
  565. */
  566. public function __destruct()
  567. {
  568. if (false === $this->isOpened()) {
  569. return;
  570. }
  571. $this->close();
  572. return;
  573. }
  574. }
  575. /**
  576. * Class \Hoa\Stream\_Protocol.
  577. *
  578. * The `hoa://Library/Stream` node.
  579. *
  580. * @copyright Copyright © 2007-2017 Hoa community
  581. * @license New BSD License
  582. */
  583. class _Protocol extends Protocol\Node
  584. {
  585. /**
  586. * Component's name.
  587. *
  588. * @var string
  589. */
  590. protected $_name = 'Stream';
  591. /**
  592. * ID of the component.
  593. *
  594. * @param string $id ID of the component.
  595. * @return mixed
  596. */
  597. public function reachId($id)
  598. {
  599. return Stream::getStreamHandler($id);
  600. }
  601. }
  602. /**
  603. * Flex entity.
  604. */
  605. Consistency::flexEntity('Hoa\Stream\Stream');
  606. /**
  607. * Shutdown method.
  608. */
  609. Consistency::registerShutdownFunction(xcallable('Hoa\Stream\Stream::_Hoa_Stream'));
  610. /**
  611. * Add the `hoa://Library/Stream` node. Should be use to reach/get an entry
  612. * in the stream register.
  613. */
  614. $protocol = Protocol::getInstance();
  615. $protocol['Library'][] = new _Protocol();