liberalstomp.php 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. <?php
  2. /**
  3. * Based on code from Stomp PHP library, working around bugs in the base class.
  4. *
  5. * Original code is copyright 2005-2006 The Apache Software Foundation
  6. * Modifications copyright 2009 StatusNet Inc by Brion Vibber <brion@status.net>
  7. *
  8. * Licensed under the Apache License, Version 2.0 (the "License");
  9. * you may not use this file except in compliance with the License.
  10. * You may obtain a copy of the License at
  11. *
  12. * http://www.apache.org/licenses/LICENSE-2.0
  13. *
  14. * Unless required by applicable law or agreed to in writing, software
  15. * distributed under the License is distributed on an "AS IS" BASIS,
  16. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17. * See the License for the specific language governing permissions and
  18. * limitations under the License.
  19. */
  20. class LiberalStomp extends Stomp
  21. {
  22. /**
  23. * We need to be able to get the socket so advanced daemons can
  24. * do a select() waiting for input both from the queue and from
  25. * other sources such as an XMPP connection.
  26. *
  27. * @return resource
  28. */
  29. function getSocket()
  30. {
  31. return $this->_socket;
  32. }
  33. /**
  34. * Return the host we're currently connected to.
  35. *
  36. * @return string
  37. */
  38. function getServer()
  39. {
  40. $idx = $this->_currentHost;
  41. if ($idx >= 0) {
  42. $host = $this->_hosts[$idx];
  43. return "$host[0]:$host[1]";
  44. } else {
  45. return '[unconnected]';
  46. }
  47. }
  48. /**
  49. * Make socket connection to the server
  50. * We also set the stream to non-blocking mode, since we'll be
  51. * select'ing to wait for updates. In blocking mode it seems
  52. * to get confused sometimes.
  53. *
  54. * @throws StompException
  55. */
  56. protected function _makeConnection ()
  57. {
  58. parent::_makeConnection();
  59. stream_set_blocking($this->_socket, 0);
  60. }
  61. /**
  62. * Version 1.0.0 of the Stomp library gets confused if messages
  63. * come in too fast over the connection. This version will read
  64. * out as many frames as are ready to be read from the socket.
  65. *
  66. * Modified from Stomp::readFrame()
  67. *
  68. * @return StompFrame False when no frame to read
  69. */
  70. public function readFrames ()
  71. {
  72. if (!$this->hasFrameToRead()) {
  73. return false;
  74. }
  75. $rb = 1024;
  76. $data = '';
  77. $end = false;
  78. $frames = array();
  79. do {
  80. // @fixme this sometimes hangs in blocking mode...
  81. // shouldn't we have been idle until we found there's more data?
  82. $read = fread($this->_socket, $rb);
  83. if ($read === false || ($read === '' && feof($this->_socket))) {
  84. // @fixme possibly attempt an auto reconnect as old code?
  85. throw new StompException("Error reading");
  86. //$this->_reconnect();
  87. // @fixme this will lose prior items
  88. //return $this->readFrames();
  89. }
  90. $data .= $read;
  91. if (strpos($data, "\x00") !== false) {
  92. // Frames are null-delimited, but some servers
  93. // may append an extra \n according to old bug reports.
  94. $data = str_replace("\x00\n", "\x00", $data);
  95. $chunks = explode("\x00", $data);
  96. $data = array_pop($chunks);
  97. $frames = array_merge($frames, $chunks);
  98. if ($data == '') {
  99. // We're at the end of a frame; stop reading.
  100. break;
  101. } else {
  102. // In the middle of a frame; keep going.
  103. }
  104. }
  105. // @fixme find out why this len < 2 check was there
  106. //$len = strlen($data);
  107. } while (true);//$len < 2 || $end == false);
  108. return array_map(array($this, 'parseFrame'), $frames);
  109. }
  110. /**
  111. * Parse a raw Stomp frame into an object.
  112. * Extracted from Stomp::readFrame()
  113. *
  114. * @param string $data
  115. * @return StompFrame
  116. */
  117. function parseFrame($data)
  118. {
  119. list ($header, $body) = explode("\n\n", $data, 2);
  120. $header = explode("\n", $header);
  121. $headers = array();
  122. $command = null;
  123. foreach ($header as $v) {
  124. if (isset($command)) {
  125. list ($name, $value) = explode(':', $v, 2);
  126. $headers[$name] = $value;
  127. } else {
  128. $command = $v;
  129. }
  130. }
  131. $frame = new StompFrame($command, $headers, trim($body));
  132. if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
  133. require_once 'Stomp/Message/Map.php';
  134. return new StompMessageMap($frame);
  135. } else {
  136. return $frame;
  137. }
  138. return $frame;
  139. }
  140. /**
  141. * Write frame to server
  142. *
  143. * @param StompFrame $stompFrame
  144. */
  145. protected function _writeFrame (StompFrame $stompFrame)
  146. {
  147. if (!is_resource($this->_socket)) {
  148. require_once 'Stomp/Exception.php';
  149. throw new StompException('Socket connection hasn\'t been established');
  150. }
  151. $data = $stompFrame->__toString();
  152. // Make sure the socket's in a writable state; if not, wait a bit.
  153. stream_set_blocking($this->_socket, 1);
  154. $r = fwrite($this->_socket, $data, strlen($data));
  155. stream_set_blocking($this->_socket, 0);
  156. if ($r === false || $r == 0) {
  157. $this->_reconnect();
  158. $this->_writeFrame($stompFrame);
  159. }
  160. }
  161. }