Stomp.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. <?php
  2. /**
  3. *
  4. * Copyright 2005-2006 The Apache Software Foundation
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. /* vim: set expandtab tabstop=3 shiftwidth=3: */
  19. require_once 'Stomp/Frame.php';
  20. /**
  21. * A Stomp Connection
  22. *
  23. *
  24. * @package Stomp
  25. * @author Hiram Chirino <hiram@hiramchirino.com>
  26. * @author Dejan Bosanac <dejan@nighttale.net>
  27. * @author Michael Caplan <mcaplan@labnet.net>
  28. * @version $Revision: 43 $
  29. */
  30. class Stomp
  31. {
  32. /**
  33. * Perform request synchronously
  34. *
  35. * @var boolean
  36. */
  37. public $sync = false;
  38. /**
  39. * Default prefetch size
  40. *
  41. * @var int
  42. */
  43. public $prefetchSize = 1;
  44. /**
  45. * Client id used for durable subscriptions
  46. *
  47. * @var string
  48. */
  49. public $clientId = null;
  50. protected $_brokerUri = null;
  51. protected $_socket = null;
  52. protected $_hosts = array();
  53. protected $_params = array();
  54. protected $_subscriptions = array();
  55. protected $_defaultPort = 61613;
  56. protected $_currentHost = - 1;
  57. protected $_attempts = 10;
  58. protected $_username = '';
  59. protected $_password = '';
  60. protected $_sessionId;
  61. protected $_read_timeout_seconds = 60;
  62. protected $_read_timeout_milliseconds = 0;
  63. protected $_connect_timeout_seconds = 60;
  64. /**
  65. * Constructor
  66. *
  67. * @param string $brokerUri Broker URL
  68. * @throws StompException
  69. */
  70. public function __construct ($brokerUri)
  71. {
  72. $this->_brokerUri = $brokerUri;
  73. $this->_init();
  74. }
  75. /**
  76. * Initialize connection
  77. *
  78. * @throws StompException
  79. */
  80. protected function _init ()
  81. {
  82. $pattern = "|^(([a-zA-Z]+)://)+\(*([a-zA-Z0-9\.:/i,-]+)\)*\??([a-zA-Z0-9=]*)$|i";
  83. if (preg_match($pattern, $this->_brokerUri, $regs)) {
  84. $scheme = $regs[2];
  85. $hosts = $regs[3];
  86. $params = $regs[4];
  87. if ($scheme != "failover") {
  88. $this->_processUrl($this->_brokerUri);
  89. } else {
  90. $urls = explode(",", $hosts);
  91. foreach ($urls as $url) {
  92. $this->_processUrl($url);
  93. }
  94. }
  95. if ($params != null) {
  96. parse_str($params, $this->_params);
  97. }
  98. } else {
  99. require_once 'Stomp/Exception.php';
  100. throw new StompException("Bad Broker URL {$this->_brokerUri}");
  101. }
  102. }
  103. /**
  104. * Process broker URL
  105. *
  106. * @param string $url Broker URL
  107. * @throws StompException
  108. * @return boolean
  109. */
  110. protected function _processUrl ($url)
  111. {
  112. $parsed = parse_url($url);
  113. if ($parsed) {
  114. array_push($this->_hosts, array($parsed['host'] , $parsed['port'] , $parsed['scheme']));
  115. } else {
  116. require_once 'Stomp/Exception.php';
  117. throw new StompException("Bad Broker URL $url");
  118. }
  119. }
  120. /**
  121. * Make socket connection to the server
  122. *
  123. * @throws StompException
  124. */
  125. protected function _makeConnection ()
  126. {
  127. if (count($this->_hosts) == 0) {
  128. require_once 'Stomp/Exception.php';
  129. throw new StompException("No broker defined");
  130. }
  131. // force disconnect, if previous established connection exists
  132. $this->disconnect();
  133. $i = $this->_currentHost;
  134. $att = 0;
  135. $connected = false;
  136. $connect_errno = null;
  137. $connect_errstr = null;
  138. while (! $connected && $att ++ < $this->_attempts) {
  139. if (isset($this->_params['randomize']) && $this->_params['randomize'] == 'true') {
  140. $i = rand(0, count($this->_hosts) - 1);
  141. } else {
  142. $i = ($i + 1) % count($this->_hosts);
  143. }
  144. $broker = $this->_hosts[$i];
  145. $host = $broker[0];
  146. $port = $broker[1];
  147. $scheme = $broker[2];
  148. if ($port == null) {
  149. $port = $this->_defaultPort;
  150. }
  151. if ($this->_socket != null) {
  152. fclose($this->_socket);
  153. $this->_socket = null;
  154. }
  155. $this->_socket = @fsockopen($scheme . '://' . $host, $port, $connect_errno, $connect_errstr, $this->_connect_timeout_seconds);
  156. if (!is_resource($this->_socket) && $att >= $this->_attempts && !array_key_exists($i + 1, $this->_hosts)) {
  157. require_once 'Stomp/Exception.php';
  158. throw new StompException("Could not connect to $host:$port ($att/{$this->_attempts})");
  159. } else if (is_resource($this->_socket)) {
  160. $connected = true;
  161. $this->_currentHost = $i;
  162. break;
  163. }
  164. }
  165. if (! $connected) {
  166. require_once 'Stomp/Exception.php';
  167. throw new StompException("Could not connect to a broker");
  168. }
  169. }
  170. /**
  171. * Connect to server
  172. *
  173. * @param string $username
  174. * @param string $password
  175. * @return boolean
  176. * @throws StompException
  177. */
  178. public function connect ($username = '', $password = '')
  179. {
  180. $this->_makeConnection();
  181. if ($username != '') {
  182. $this->_username = $username;
  183. }
  184. if ($password != '') {
  185. $this->_password = $password;
  186. }
  187. $headers = array('login' => $this->_username , 'passcode' => $this->_password);
  188. if ($this->clientId != null) {
  189. $headers["client-id"] = $this->clientId;
  190. }
  191. $frame = new StompFrame("CONNECT", $headers);
  192. $this->_writeFrame($frame);
  193. $frame = $this->readFrame();
  194. if ($frame instanceof StompFrame && $frame->command == 'CONNECTED') {
  195. $this->_sessionId = $frame->headers["session"];
  196. return true;
  197. } else {
  198. require_once 'Stomp/Exception.php';
  199. if ($frame instanceof StompFrame) {
  200. throw new StompException("Unexpected command: {$frame->command}", 0, $frame->body);
  201. } else {
  202. throw new StompException("Connection not acknowledged");
  203. }
  204. }
  205. }
  206. /**
  207. * Check if client session has ben established
  208. *
  209. * @return boolean
  210. */
  211. public function isConnected ()
  212. {
  213. return !empty($this->_sessionId) && is_resource($this->_socket);
  214. }
  215. /**
  216. * Current stomp session ID
  217. *
  218. * @return string
  219. */
  220. public function getSessionId()
  221. {
  222. return $this->_sessionId;
  223. }
  224. /**
  225. * Send a message to a destination in the messaging system
  226. *
  227. * @param string $destination Destination queue
  228. * @param string|StompFrame $msg Message
  229. * @param array $properties
  230. * @param boolean $sync Perform request synchronously
  231. * @return boolean
  232. */
  233. public function send ($destination, $msg, $properties = array(), $sync = null)
  234. {
  235. if ($msg instanceof StompFrame) {
  236. $msg->headers['destination'] = $destination;
  237. if (is_array($properties)) $msg->headers = array_merge($msg->headers, $properties);
  238. $frame = $msg;
  239. } else {
  240. $headers = $properties;
  241. $headers['destination'] = $destination;
  242. $frame = new StompFrame('SEND', $headers, $msg);
  243. }
  244. $this->_prepareReceipt($frame, $sync);
  245. $this->_writeFrame($frame);
  246. return $this->_waitForReceipt($frame, $sync);
  247. }
  248. /**
  249. * Prepair frame receipt
  250. *
  251. * @param StompFrame $frame
  252. * @param boolean $sync
  253. */
  254. protected function _prepareReceipt (StompFrame $frame, $sync)
  255. {
  256. $receive = $this->sync;
  257. if ($sync !== null) {
  258. $receive = $sync;
  259. }
  260. if ($receive == true) {
  261. $frame->headers['receipt'] = md5(microtime());
  262. }
  263. }
  264. /**
  265. * Wait for receipt
  266. *
  267. * @param StompFrame $frame
  268. * @param boolean $sync
  269. * @return boolean
  270. * @throws StompException
  271. */
  272. protected function _waitForReceipt (StompFrame $frame, $sync)
  273. {
  274. $receive = $this->sync;
  275. if ($sync !== null) {
  276. $receive = $sync;
  277. }
  278. if ($receive == true) {
  279. $id = (isset($frame->headers['receipt'])) ? $frame->headers['receipt'] : null;
  280. if ($id == null) {
  281. return true;
  282. }
  283. $frame = $this->readFrame();
  284. if ($frame instanceof StompFrame && $frame->command == 'RECEIPT') {
  285. if ($frame->headers['receipt-id'] == $id) {
  286. return true;
  287. } else {
  288. require_once 'Stomp/Exception.php';
  289. throw new StompException("Unexpected receipt id {$frame->headers['receipt-id']}", 0, $frame->body);
  290. }
  291. } else {
  292. require_once 'Stomp/Exception.php';
  293. if ($frame instanceof StompFrame) {
  294. throw new StompException("Unexpected command {$frame->command}", 0, $frame->body);
  295. } else {
  296. throw new StompException("Receipt not received");
  297. }
  298. }
  299. }
  300. return true;
  301. }
  302. /**
  303. * Register to listen to a given destination
  304. *
  305. * @param string $destination Destination queue
  306. * @param array $properties
  307. * @param boolean $sync Perform request synchronously
  308. * @return boolean
  309. * @throws StompException
  310. */
  311. public function subscribe ($destination, $properties = null, $sync = null)
  312. {
  313. $headers = array('ack' => 'client');
  314. $headers['activemq.prefetchSize'] = $this->prefetchSize;
  315. $headers['prefetch-count'] = '1';
  316. if ($this->clientId != null) {
  317. $headers["activemq.subcriptionName"] = $this->clientId;
  318. }
  319. if (isset($properties)) {
  320. foreach ($properties as $name => $value) {
  321. $headers[$name] = $value;
  322. }
  323. }
  324. $headers['destination'] = $destination;
  325. $frame = new StompFrame('SUBSCRIBE', $headers);
  326. $this->_prepareReceipt($frame, $sync);
  327. $this->_writeFrame($frame);
  328. if ($this->_waitForReceipt($frame, $sync) == true) {
  329. $this->_subscriptions[$destination] = $properties;
  330. return true;
  331. } else {
  332. return false;
  333. }
  334. }
  335. /**
  336. * Remove an existing subscription
  337. *
  338. * @param string $destination
  339. * @param array $properties
  340. * @param boolean $sync Perform request synchronously
  341. * @return boolean
  342. * @throws StompException
  343. */
  344. public function unsubscribe ($destination, $properties = null, $sync = null)
  345. {
  346. $headers = array();
  347. if (isset($properties)) {
  348. foreach ($properties as $name => $value) {
  349. $headers[$name] = $value;
  350. }
  351. }
  352. $headers['destination'] = $destination;
  353. $frame = new StompFrame('UNSUBSCRIBE', $headers);
  354. $this->_prepareReceipt($frame, $sync);
  355. $this->_writeFrame($frame);
  356. if ($this->_waitForReceipt($frame, $sync) == true) {
  357. unset($this->_subscriptions[$destination]);
  358. return true;
  359. } else {
  360. return false;
  361. }
  362. }
  363. /**
  364. * Start a transaction
  365. *
  366. * @param string $transactionId
  367. * @param boolean $sync Perform request synchronously
  368. * @return boolean
  369. * @throws StompException
  370. */
  371. public function begin ($transactionId = null, $sync = null)
  372. {
  373. $headers = array();
  374. if (isset($transactionId)) {
  375. $headers['transaction'] = $transactionId;
  376. }
  377. $frame = new StompFrame('BEGIN', $headers);
  378. $this->_prepareReceipt($frame, $sync);
  379. $this->_writeFrame($frame);
  380. return $this->_waitForReceipt($frame, $sync);
  381. }
  382. /**
  383. * Commit a transaction in progress
  384. *
  385. * @param string $transactionId
  386. * @param boolean $sync Perform request synchronously
  387. * @return boolean
  388. * @throws StompException
  389. */
  390. public function commit ($transactionId = null, $sync = null)
  391. {
  392. $headers = array();
  393. if (isset($transactionId)) {
  394. $headers['transaction'] = $transactionId;
  395. }
  396. $frame = new StompFrame('COMMIT', $headers);
  397. $this->_prepareReceipt($frame, $sync);
  398. $this->_writeFrame($frame);
  399. return $this->_waitForReceipt($frame, $sync);
  400. }
  401. /**
  402. * Roll back a transaction in progress
  403. *
  404. * @param string $transactionId
  405. * @param boolean $sync Perform request synchronously
  406. */
  407. public function abort ($transactionId = null, $sync = null)
  408. {
  409. $headers = array();
  410. if (isset($transactionId)) {
  411. $headers['transaction'] = $transactionId;
  412. }
  413. $frame = new StompFrame('ABORT', $headers);
  414. $this->_prepareReceipt($frame, $sync);
  415. $this->_writeFrame($frame);
  416. return $this->_waitForReceipt($frame, $sync);
  417. }
  418. /**
  419. * Acknowledge consumption of a message from a subscription
  420. * Note: This operation is always asynchronous
  421. *
  422. * @param string|StompFrame $messageMessage ID
  423. * @param string $transactionId
  424. * @return boolean
  425. * @throws StompException
  426. */
  427. public function ack ($message, $transactionId = null)
  428. {
  429. if ($message instanceof StompFrame) {
  430. $headers = $message->headers;
  431. if (isset($transactionId)) {
  432. $headers['transaction'] = $transactionId;
  433. }
  434. $frame = new StompFrame('ACK', $headers);
  435. $this->_writeFrame($frame);
  436. return true;
  437. } else {
  438. $headers = array();
  439. if (isset($transactionId)) {
  440. $headers['transaction'] = $transactionId;
  441. }
  442. $headers['message-id'] = $message;
  443. $frame = new StompFrame('ACK', $headers);
  444. $this->_writeFrame($frame);
  445. return true;
  446. }
  447. }
  448. /**
  449. * Graceful disconnect from the server
  450. *
  451. */
  452. public function disconnect ()
  453. {
  454. $headers = array();
  455. if ($this->clientId != null) {
  456. $headers["client-id"] = $this->clientId;
  457. }
  458. if (is_resource($this->_socket)) {
  459. $this->_writeFrame(new StompFrame('DISCONNECT', $headers));
  460. fclose($this->_socket);
  461. }
  462. $this->_socket = null;
  463. $this->_sessionId = null;
  464. $this->_currentHost = -1;
  465. $this->_subscriptions = array();
  466. $this->_username = '';
  467. $this->_password = '';
  468. }
  469. /**
  470. * Write frame to server
  471. *
  472. * @param StompFrame $stompFrame
  473. */
  474. protected function _writeFrame (StompFrame $stompFrame)
  475. {
  476. if (!is_resource($this->_socket)) {
  477. require_once 'Stomp/Exception.php';
  478. throw new StompException('Socket connection hasn\'t been established');
  479. }
  480. $data = $stompFrame->__toString();
  481. $r = fwrite($this->_socket, $data, strlen($data));
  482. if ($r === false || $r == 0) {
  483. $this->_reconnect();
  484. $this->_writeFrame($stompFrame);
  485. }
  486. }
  487. /**
  488. * Set timeout to wait for content to read
  489. *
  490. * @param int $seconds_to_wait Seconds to wait for a frame
  491. * @param int $milliseconds Milliseconds to wait for a frame
  492. */
  493. public function setReadTimeout($seconds, $milliseconds = 0)
  494. {
  495. $this->_read_timeout_seconds = $seconds;
  496. $this->_read_timeout_milliseconds = $milliseconds;
  497. }
  498. /**
  499. * Read response frame from server
  500. *
  501. * @return StompFrame False when no frame to read
  502. */
  503. public function readFrame ()
  504. {
  505. if (!$this->hasFrameToRead()) {
  506. return false;
  507. }
  508. $rb = 1024;
  509. $data = '';
  510. $end = false;
  511. do {
  512. $read = fread($this->_socket, $rb);
  513. if ($read === false) {
  514. $this->_reconnect();
  515. return $this->readFrame();
  516. }
  517. $data .= $read;
  518. if (strpos($data, "\x00") !== false) {
  519. $end = true;
  520. $data = rtrim($data, "\n");
  521. }
  522. $len = strlen($data);
  523. } while ($len < 2 || $end == false);
  524. list ($header, $body) = explode("\n\n", $data, 2);
  525. $header = explode("\n", $header);
  526. $headers = array();
  527. $command = null;
  528. foreach ($header as $v) {
  529. if (isset($command)) {
  530. list ($name, $value) = explode(':', $v, 2);
  531. $headers[$name] = $value;
  532. } else {
  533. $command = $v;
  534. }
  535. }
  536. $frame = new StompFrame($command, $headers, trim($body));
  537. if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
  538. require_once 'Stomp/Message/Map.php';
  539. return new StompMessageMap($frame);
  540. } else {
  541. return $frame;
  542. }
  543. return $frame;
  544. }
  545. /**
  546. * Check if there is a frame to read
  547. *
  548. * @return boolean
  549. */
  550. public function hasFrameToRead()
  551. {
  552. $read = array($this->_socket);
  553. $write = null;
  554. $except = null;
  555. $has_frame_to_read = @stream_select($read, $write, $except, $this->_read_timeout_seconds, $this->_read_timeout_milliseconds);
  556. if ($has_frame_to_read !== false)
  557. $has_frame_to_read = count($read);
  558. if ($has_frame_to_read === false) {
  559. throw new StompException('Check failed to determine if the socket is readable');
  560. } else if ($has_frame_to_read > 0) {
  561. return true;
  562. } else {
  563. return false;
  564. }
  565. }
  566. /**
  567. * Reconnects and renews subscriptions (if there were any)
  568. * Call this method when you detect connection problems
  569. */
  570. protected function _reconnect ()
  571. {
  572. $subscriptions = $this->_subscriptions;
  573. $this->connect($this->_username, $this->_password);
  574. foreach ($subscriptions as $dest => $properties) {
  575. $this->subscribe($dest, $properties);
  576. }
  577. }
  578. /**
  579. * Graceful object desruction
  580. *
  581. */
  582. public function __destruct()
  583. {
  584. $this->disconnect();
  585. }
  586. }
  587. ?>