stompqueuemanager.php 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763
  1. <?php
  2. /**
  3. * StatusNet, the distributed open-source microblogging tool
  4. *
  5. * Abstract class for queue managers
  6. *
  7. * PHP version 5
  8. *
  9. * LICENCE: This program is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU Affero General Public License as published by
  11. * the Free Software Foundation, either version 3 of the License, or
  12. * (at your option) any later version.
  13. *
  14. * This program is distributed in the hope that it will be useful,
  15. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. * GNU Affero General Public License for more details.
  18. *
  19. * You should have received a copy of the GNU Affero General Public License
  20. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. *
  22. * @category QueueManager
  23. * @package StatusNet
  24. * @author Evan Prodromou <evan@status.net>
  25. * @author Sarven Capadisli <csarven@status.net>
  26. * @copyright 2009 StatusNet, Inc.
  27. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
  28. * @link http://status.net/
  29. */
  30. require_once 'Stomp.php';
  31. require_once 'Stomp/Exception.php';
  32. class StompQueueManager extends QueueManager
  33. {
  34. protected $servers;
  35. protected $username;
  36. protected $password;
  37. protected $base;
  38. protected $control;
  39. protected $useTransactions;
  40. protected $useAcks;
  41. protected $sites = array();
  42. protected $subscriptions = array();
  43. protected $cons = array(); // all open connections
  44. protected $disconnect = array();
  45. protected $transaction = array();
  46. protected $transactionCount = array();
  47. protected $defaultIdx = 0;
  48. function __construct()
  49. {
  50. parent::__construct();
  51. $server = common_config('queue', 'stomp_server');
  52. if (is_array($server)) {
  53. $this->servers = $server;
  54. } else {
  55. $this->servers = array($server);
  56. }
  57. $this->username = common_config('queue', 'stomp_username');
  58. $this->password = common_config('queue', 'stomp_password');
  59. $this->base = common_config('queue', 'queue_basename');
  60. $this->control = common_config('queue', 'control_channel');
  61. $this->breakout = common_config('queue', 'breakout');
  62. $this->useTransactions = common_config('queue', 'stomp_transactions');
  63. $this->useAcks = common_config('queue', 'stomp_acks');
  64. }
  65. /**
  66. * Tell the i/o master we only need a single instance to cover
  67. * all sites running in this process.
  68. */
  69. public static function multiSite()
  70. {
  71. return IoManager::INSTANCE_PER_PROCESS;
  72. }
  73. /**
  74. * Optional; ping any running queue handler daemons with a notification
  75. * such as announcing a new site to handle or requesting clean shutdown.
  76. * This avoids having to restart all the daemons manually to update configs
  77. * and such.
  78. *
  79. * Currently only relevant for multi-site queue managers such as Stomp.
  80. *
  81. * @param string $event event key
  82. * @param string $param optional parameter to append to key
  83. * @return boolean success
  84. */
  85. public function sendControlSignal($event, $param='')
  86. {
  87. $message = $event;
  88. if ($param != '') {
  89. $message .= ':' . $param;
  90. }
  91. $this->_connect();
  92. $con = $this->cons[$this->defaultIdx];
  93. $result = $con->send($this->control,
  94. $message,
  95. array ('created' => common_sql_now()));
  96. if ($result) {
  97. $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message");
  98. return true;
  99. } else {
  100. $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message");
  101. return false;
  102. }
  103. }
  104. /**
  105. * Saves an object into the queue item table.
  106. *
  107. * @param mixed $object
  108. * @param string $queue
  109. * @param string $siteNickname optional override to drop into another site's queue
  110. *
  111. * @return boolean true on success
  112. * @throws StompException on connection or send error
  113. */
  114. public function enqueue($object, $queue, $siteNickname=null)
  115. {
  116. $this->_connect();
  117. if (common_config('queue', 'stomp_enqueue_on')) {
  118. // We're trying to force all writes to a single server.
  119. // WARNING: this might do odd things if that server connection dies.
  120. $idx = array_search(common_config('queue', 'stomp_enqueue_on'),
  121. $this->servers);
  122. if ($idx === false) {
  123. common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.');
  124. $idx = $this->defaultIdx;
  125. }
  126. } else {
  127. $idx = $this->defaultIdx;
  128. }
  129. return $this->_doEnqueue($object, $queue, $idx, $siteNickname);
  130. }
  131. /**
  132. * Saves a notice object reference into the queue item table
  133. * on the given connection.
  134. *
  135. * @return boolean true on success
  136. * @throws StompException on connection or send error
  137. */
  138. protected function _doEnqueue($object, $queue, $idx, $siteNickname=null)
  139. {
  140. $rep = $this->logrep($object);
  141. $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'),
  142. 'handler' => $queue,
  143. 'payload' => $this->encode($object));
  144. $msg = base64_encode(serialize($envelope));
  145. $props = array('created' => common_sql_now());
  146. if ($this->isPersistent($queue)) {
  147. $props['persistent'] = 'true';
  148. }
  149. $con = $this->cons[$idx];
  150. $host = $con->getServer();
  151. $target = $this->queueName($queue);
  152. $result = $con->send($target, $msg, $props);
  153. if (!$result) {
  154. $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target");
  155. return false;
  156. }
  157. $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target");
  158. $this->stats('enqueued', $queue);
  159. return true;
  160. }
  161. /**
  162. * Determine whether messages to this queue should be marked as persistent.
  163. * Actual persistent storage depends on the queue server's configuration.
  164. * @param string $queue
  165. * @return bool
  166. */
  167. protected function isPersistent($queue)
  168. {
  169. $mode = common_config('queue', 'stomp_persistent');
  170. if (is_array($mode)) {
  171. return in_array($queue, $mode);
  172. } else {
  173. return (bool)$mode;
  174. }
  175. }
  176. /**
  177. * Send any sockets we're listening on to the IO manager
  178. * to wait for input.
  179. *
  180. * @return array of resources
  181. */
  182. public function getSockets()
  183. {
  184. $sockets = array();
  185. foreach ($this->cons as $con) {
  186. if ($con) {
  187. $sockets[] = $con->getSocket();
  188. }
  189. }
  190. return $sockets;
  191. }
  192. /**
  193. * Get the Stomp connection object associated with the given socket.
  194. * @param resource $socket
  195. * @return int index into connections list
  196. * @throws Exception
  197. */
  198. protected function connectionFromSocket($socket)
  199. {
  200. foreach ($this->cons as $i => $con) {
  201. if ($con && $con->getSocket() === $socket) {
  202. return $i;
  203. }
  204. }
  205. throw new Exception(__CLASS__ . " asked to read from unrecognized socket");
  206. }
  207. /**
  208. * We've got input to handle on our socket!
  209. * Read any waiting Stomp frame(s) and process them.
  210. *
  211. * @param resource $socket
  212. * @return boolean ok on success
  213. */
  214. public function handleInput($socket)
  215. {
  216. $idx = $this->connectionFromSocket($socket);
  217. $con = $this->cons[$idx];
  218. $host = $con->getServer();
  219. $this->defaultIdx = $idx;
  220. $ok = true;
  221. try {
  222. $frames = $con->readFrames();
  223. } catch (StompException $e) {
  224. $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage());
  225. fclose($socket); // ???
  226. $this->cons[$idx] = null;
  227. $this->transaction[$idx] = null;
  228. $this->disconnect[$idx] = time();
  229. return false;
  230. }
  231. foreach ($frames as $frame) {
  232. $dest = $frame->headers['destination'];
  233. if ($dest == $this->control) {
  234. if (!$this->handleControlSignal($frame)) {
  235. // We got a control event that requests a shutdown;
  236. // close out and stop handling anything else!
  237. break;
  238. }
  239. } else {
  240. $ok = $this->handleItem($frame) && $ok;
  241. }
  242. $this->ack($idx, $frame);
  243. $this->commit($idx);
  244. $this->begin($idx);
  245. }
  246. return $ok;
  247. }
  248. /**
  249. * Attempt to reconnect in background if we lost a connection.
  250. */
  251. function idle()
  252. {
  253. $now = time();
  254. foreach ($this->cons as $idx => $con) {
  255. if (empty($con)) {
  256. $age = $now - $this->disconnect[$idx];
  257. if ($age >= 60) {
  258. $this->_reconnect($idx);
  259. }
  260. }
  261. }
  262. return true;
  263. }
  264. /**
  265. * Initialize our connection and subscribe to all the queues
  266. * we're going to need to handle... If multiple queue servers
  267. * are configured for failover, we'll listen to all of them.
  268. *
  269. * Side effects: in multi-site mode, may reset site configuration.
  270. *
  271. * @param IoMaster $master process/event controller
  272. * @return bool return false on failure
  273. */
  274. public function start($master)
  275. {
  276. parent::start($master);
  277. $this->_connectAll();
  278. foreach ($this->cons as $i => $con) {
  279. if ($con) {
  280. $this->doSubscribe($con);
  281. $this->begin($i);
  282. }
  283. }
  284. return true;
  285. }
  286. /**
  287. * Close out any active connections.
  288. *
  289. * @return bool return false on failure
  290. */
  291. public function finish()
  292. {
  293. // If there are any outstanding delivered messages we haven't processed,
  294. // free them for another thread to take.
  295. foreach ($this->cons as $i => $con) {
  296. if ($con) {
  297. $this->rollback($i);
  298. $con->disconnect();
  299. $this->cons[$i] = null;
  300. }
  301. }
  302. return true;
  303. }
  304. /**
  305. * Lazy open a single connection to Stomp queue server.
  306. * If multiple servers are configured, we let the Stomp client library
  307. * worry about finding a working connection among them.
  308. */
  309. protected function _connect()
  310. {
  311. if (empty($this->cons)) {
  312. $list = $this->servers;
  313. if (count($list) > 1) {
  314. shuffle($list); // Randomize to spread load
  315. $url = 'failover://(' . implode(',', $list) . ')';
  316. } else {
  317. $url = $list[0];
  318. }
  319. $con = $this->_doConnect($url);
  320. $this->cons = array($con);
  321. $this->transactionCount = array(0);
  322. $this->transaction = array(null);
  323. $this->disconnect = array(null);
  324. }
  325. }
  326. /**
  327. * Lazy open connections to all Stomp servers, if in manual failover
  328. * mode. This means the queue servers don't speak to each other, so
  329. * we have to listen to all of them to make sure we get all events.
  330. */
  331. protected function _connectAll()
  332. {
  333. if (!common_config('queue', 'stomp_manual_failover')) {
  334. return $this->_connect();
  335. }
  336. if (empty($this->cons)) {
  337. $this->cons = array();
  338. $this->transactionCount = array();
  339. $this->transaction = array();
  340. foreach ($this->servers as $idx => $server) {
  341. try {
  342. $this->cons[] = $this->_doConnect($server);
  343. $this->disconnect[] = null;
  344. } catch (Exception $e) {
  345. // s'okay, we'll live
  346. $this->cons[] = null;
  347. $this->disconnect[] = time();
  348. }
  349. $this->transactionCount[] = 0;
  350. $this->transaction[] = null;
  351. }
  352. if (empty($this->cons)) {
  353. throw new ServerException("No queue servers reachable...");
  354. return false;
  355. }
  356. }
  357. }
  358. /**
  359. * Attempt to manually reconnect to the Stomp server for the given
  360. * slot. If successful, set up our subscriptions on it.
  361. */
  362. protected function _reconnect($idx)
  363. {
  364. try {
  365. $con = $this->_doConnect($this->servers[$idx]);
  366. } catch (Exception $e) {
  367. $this->_log(LOG_ERR, $e->getMessage());
  368. $con = null;
  369. }
  370. if ($con) {
  371. $this->cons[$idx] = $con;
  372. $this->disconnect[$idx] = null;
  373. $this->doSubscribe($con);
  374. $this->begin($idx);
  375. } else {
  376. // Try again later...
  377. $this->disconnect[$idx] = time();
  378. }
  379. }
  380. protected function _doConnect($server)
  381. {
  382. $this->_log(LOG_INFO, "Connecting to '$server' as '$this->username'...");
  383. $con = new LiberalStomp($server);
  384. if ($con->connect($this->username, $this->password)) {
  385. $this->_log(LOG_INFO, "Connected.");
  386. } else {
  387. $this->_log(LOG_ERR, 'Failed to connect to queue server');
  388. throw new ServerException('Failed to connect to queue server');
  389. }
  390. return $con;
  391. }
  392. /**
  393. * Set up all our raw queue subscriptions on the given connection
  394. * @param LiberalStomp $con
  395. */
  396. protected function doSubscribe(LiberalStomp $con)
  397. {
  398. $host = $con->getServer();
  399. foreach ($this->subscriptions() as $sub) {
  400. $this->_log(LOG_INFO, "Subscribing to $sub on $host");
  401. $con->subscribe($sub);
  402. }
  403. }
  404. /**
  405. * Grab a full list of stomp-side queue subscriptions.
  406. * Will include:
  407. * - control broadcast channel
  408. * - shared group queues for active groups
  409. * - per-handler and per-site breakouts from $config['queue']['breakout']
  410. * that are rooted in the active groups.
  411. *
  412. * @return array of strings
  413. */
  414. protected function subscriptions()
  415. {
  416. $subs = array();
  417. $subs[] = $this->control;
  418. foreach ($this->activeGroups as $group) {
  419. $subs[] = $this->base . $group;
  420. }
  421. foreach ($this->breakout as $spec) {
  422. $parts = explode('/', $spec);
  423. if (count($parts) < 2 || count($parts) > 3) {
  424. common_log(LOG_ERR, "Bad queue breakout specifier $spec");
  425. }
  426. if (in_array($parts[0], $this->activeGroups)) {
  427. $subs[] = $this->base . $spec;
  428. }
  429. }
  430. return array_unique($subs);
  431. }
  432. /**
  433. * Handle and acknowledge an event that's come in through a queue.
  434. *
  435. * If the queue handler reports failure, the message is requeued for later.
  436. * Missing notices or handler classes will drop the message.
  437. *
  438. * Side effects: in multi-site mode, may reset site configuration to
  439. * match the site that queued the event.
  440. *
  441. * @param StompFrame $frame
  442. * @return bool success
  443. */
  444. protected function handleItem($frame)
  445. {
  446. $host = $this->cons[$this->defaultIdx]->getServer();
  447. $message = unserialize(base64_decode($frame->body));
  448. if ($message === false) {
  449. $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}");
  450. $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body));
  451. return false;
  452. }
  453. $site = $message['site'];
  454. $queue = $message['handler'];
  455. if ($this->isDeadLetter($frame, $message)) {
  456. $this->stats('deadletter', $queue);
  457. return false;
  458. }
  459. // @fixme detect failing site switches
  460. $this->switchSite($site);
  461. try {
  462. $item = $this->decode($message['payload']);
  463. } catch (Exception $e) {
  464. $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
  465. $this->stats('baditem', $queue);
  466. return false;
  467. }
  468. $info = $this->logrep($item) . " posted at " .
  469. $frame->headers['created'] . " in queue $queue from $host";
  470. $this->_log(LOG_DEBUG, "Dequeued $info");
  471. try {
  472. $handler = $this->getHandler($queue);
  473. $ok = $handler->handle($item);
  474. } catch (NoQueueHandlerException $e) {
  475. $this->_log(LOG_ERR, "Missing handler class; skipping $info");
  476. $this->stats('badhandler', $queue);
  477. return false;
  478. } catch (Exception $e) {
  479. $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage());
  480. $ok = false;
  481. }
  482. if ($ok) {
  483. $this->_log(LOG_INFO, "Successfully handled $info");
  484. $this->stats('handled', $queue);
  485. } else {
  486. $this->_log(LOG_WARNING, "Failed handling $info");
  487. // Requeing moves the item to the end of the line for its next try.
  488. // @fixme add a manual retry count
  489. $this->enqueue($item, $queue);
  490. $this->stats('requeued', $queue);
  491. }
  492. return $ok;
  493. }
  494. /**
  495. * Check if a redelivered message has been run through enough
  496. * that we're going to give up on it.
  497. *
  498. * @param StompFrame $frame
  499. * @param array $message unserialized message body
  500. * @return boolean true if we should discard
  501. */
  502. protected function isDeadLetter($frame, $message)
  503. {
  504. if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') {
  505. // Message was redelivered, possibly indicating a previous failure.
  506. $msgId = $frame->headers['message-id'];
  507. $site = $message['site'];
  508. $queue = $message['handler'];
  509. $msgInfo = "message $msgId for $site in queue $queue";
  510. $deliveries = $this->incDeliveryCount($msgId);
  511. if ($deliveries > common_config('queue', 'max_retries')) {
  512. $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo";
  513. $outdir = common_config('queue', 'dead_letter_dir');
  514. if ($outdir) {
  515. $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId);
  516. $info .= ": dumping to $filename";
  517. file_put_contents($filename, $message['payload']);
  518. }
  519. common_log(LOG_ERR, $info);
  520. return true;
  521. } else {
  522. common_log(LOG_INFO, "retry $deliveries on $msgInfo");
  523. }
  524. }
  525. return false;
  526. }
  527. /**
  528. * Update count of times we've re-encountered this message recently,
  529. * triggered when we get a message marked as 'redelivered'.
  530. *
  531. * Requires a CLI-friendly cache configuration.
  532. *
  533. * @param string $msgId message-id header from message
  534. * @return int number of retries recorded
  535. */
  536. function incDeliveryCount($msgId)
  537. {
  538. $count = 0;
  539. $cache = Cache::instance();
  540. if ($cache) {
  541. $key = 'statusnet:stomp:message-retries:' . $msgId;
  542. $count = $cache->increment($key);
  543. if (!$count) {
  544. $count = 1;
  545. $cache->set($key, $count, null, 3600);
  546. $got = $cache->get($key);
  547. }
  548. }
  549. return $count;
  550. }
  551. /**
  552. * Process a control signal broadcast.
  553. *
  554. * @param int $idx connection index
  555. * @param array $frame Stomp frame
  556. * @return bool true to continue; false to stop further processing.
  557. */
  558. protected function handleControlSignal($idx, $frame)
  559. {
  560. $message = trim($frame->body);
  561. if (strpos($message, ':') !== false) {
  562. list($event, $param) = explode(':', $message, 2);
  563. } else {
  564. $event = $message;
  565. $param = '';
  566. }
  567. $shutdown = false;
  568. if ($event == 'shutdown') {
  569. $this->master->requestShutdown();
  570. $shutdown = true;
  571. } else if ($event == 'restart') {
  572. $this->master->requestRestart();
  573. $shutdown = true;
  574. } else if ($event == 'update') {
  575. $this->updateSiteConfig($param);
  576. } else {
  577. $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
  578. }
  579. return $shutdown;
  580. }
  581. /**
  582. * Switch site, if necessary, and reset current handler assignments
  583. * @param string $site
  584. */
  585. function switchSite($site)
  586. {
  587. if ($site != GNUsocial::currentSite()) {
  588. $this->stats('switch');
  589. GNUsocial::switchSite($site);
  590. $this->initialize();
  591. }
  592. }
  593. /**
  594. * (Re)load runtime configuration for a given site by nickname,
  595. * triggered by a broadcast to the 'statusnet-control' topic.
  596. *
  597. * Configuration changes in database should update, but config
  598. * files might not.
  599. *
  600. * @param array $frame Stomp frame
  601. * @return bool true to continue; false to stop further processing.
  602. */
  603. protected function updateSiteConfig($nickname)
  604. {
  605. $sn = Status_network::getKV('nickname', $nickname);
  606. if ($sn) {
  607. $this->switchSite($nickname);
  608. if (!in_array($nickname, $this->sites)) {
  609. $this->addSite();
  610. }
  611. $this->stats('siteupdate');
  612. } else {
  613. $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
  614. }
  615. }
  616. /**
  617. * Combines the queue_basename from configuration with the
  618. * group name for this queue to give eg:
  619. *
  620. * /queue/statusnet/main
  621. * /queue/statusnet/main/distrib
  622. * /queue/statusnet/xmpp/xmppout/site01
  623. *
  624. * @param string $queue
  625. * @return string
  626. */
  627. protected function queueName($queue)
  628. {
  629. $group = $this->queueGroup($queue);
  630. $site = GNUsocial::currentSite();
  631. $specs = array("$group/$queue/$site",
  632. "$group/$queue");
  633. foreach ($specs as $spec) {
  634. if (in_array($spec, $this->breakout)) {
  635. return $this->base . $spec;
  636. }
  637. }
  638. return $this->base . $group;
  639. }
  640. /**
  641. * Get the breakout mode for the given queue on the current site.
  642. *
  643. * @param string $queue
  644. * @return string one of 'shared', 'handler', 'site'
  645. */
  646. protected function breakoutMode($queue)
  647. {
  648. $breakout = common_config('queue', 'breakout');
  649. if (isset($breakout[$queue])) {
  650. return $breakout[$queue];
  651. } else if (isset($breakout['*'])) {
  652. return $breakout['*'];
  653. } else {
  654. return 'shared';
  655. }
  656. }
  657. protected function begin($idx)
  658. {
  659. if ($this->useTransactions) {
  660. if (!empty($this->transaction[$idx])) {
  661. throw new Exception("Tried to start transaction in the middle of a transaction");
  662. }
  663. $this->transactionCount[$idx]++;
  664. $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time();
  665. $this->cons[$idx]->begin($this->transaction[$idx]);
  666. }
  667. }
  668. protected function ack($idx, $frame)
  669. {
  670. if ($this->useAcks) {
  671. if ($this->useTransactions) {
  672. if (empty($this->transaction[$idx])) {
  673. throw new Exception("Tried to ack but not in a transaction");
  674. }
  675. $this->cons[$idx]->ack($frame, $this->transaction[$idx]);
  676. } else {
  677. $this->cons[$idx]->ack($frame);
  678. }
  679. }
  680. }
  681. protected function commit($idx)
  682. {
  683. if ($this->useTransactions) {
  684. if (empty($this->transaction[$idx])) {
  685. throw new Exception("Tried to commit but not in a transaction");
  686. }
  687. $this->cons[$idx]->commit($this->transaction[$idx]);
  688. $this->transaction[$idx] = null;
  689. }
  690. }
  691. protected function rollback($idx)
  692. {
  693. if ($this->useTransactions) {
  694. if (empty($this->transaction[$idx])) {
  695. throw new Exception("Tried to rollback but not in a transaction");
  696. }
  697. $this->cons[$idx]->commit($this->transaction[$idx]);
  698. $this->transaction[$idx] = null;
  699. }
  700. }
  701. }