StompQueueManager.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. <?php
  2. use Stomp\Client;
  3. use Stomp\Exception\StompException;
  4. use Stomp\Network\Connection;
  5. use Stomp\Transport\Frame;
  6. use Stomp\Transport\Message;
  7. use Stomp\StatefulStomp;
  8. class StompQueueManager extends QueueManager
  9. {
  10. protected $pl = null;
  11. protected $stomps = null;
  12. protected $transaction;
  13. protected $transactionCount;
  14. protected $activeGroups;
  15. public function __construct($pl)
  16. {
  17. parent::__construct();
  18. $this->pl = $pl;
  19. }
  20. /**
  21. * Initialize our connection and subscribe to all the queues
  22. * we're going to need to handle... If multiple queue servers
  23. * are configured for failover, we'll listen to all of them.
  24. *
  25. * Side effects: in multi-site mode, may reset site configuration.
  26. *
  27. * @param IoMaster $master process/event controller
  28. * @return bool return false on failure
  29. * @throws ServerException
  30. */
  31. public function start($master)
  32. {
  33. parent::start($master);
  34. return $this->_ensureConn();
  35. }
  36. /**
  37. * Close out any active connections.
  38. *
  39. * @return bool return false on failure
  40. * @throws Exception
  41. */
  42. public function finish()
  43. {
  44. foreach ($this->stomps as $idx => $stomp) {
  45. common_log(LOG_INFO, "Unsubscribing on: " . $stomp->getClient()->getConnection()->getHost());
  46. $stomp->unsubscribe();
  47. }
  48. // If there are any outstanding delivered messages we haven't processed,
  49. // free them for another thread to take.
  50. foreach ($this->stomps as $i => $st) {
  51. if ($st) {
  52. $this->rollback($i);
  53. $st->getClient()->disconnect();
  54. $this->stomps[$i] = null;
  55. }
  56. }
  57. return true;
  58. }
  59. /**
  60. * Lazy open connections to all STOMP servers, if in manual failover
  61. * mode. This means the queue servers don't speak to each other, so
  62. * we have to listen to all of them to make sure we get all events.
  63. */
  64. private function _ensureConn()
  65. {
  66. if ($this->stomps === null) {
  67. if (!$this->pl->manualFailover) {
  68. $list = $this->pl->servers;
  69. if (count($list) > 1) {
  70. shuffle($list); // Randomize to spread load
  71. $url = 'failover://(' . implode(',', $list) . ')';
  72. } else {
  73. $url = $list[0];
  74. }
  75. $st = $this->_doConnect($url);
  76. $this->stomps = [$st];
  77. $this->transactionCount = [0];
  78. $this->transaction = [null];
  79. $this->disconnect = [null];
  80. } else {
  81. $this->stomps = [];
  82. $this->transactionCount = [];
  83. $this->transaction = [];
  84. foreach ($this->pl->servers as $_ => $url) {
  85. try {
  86. $this->stomps[] = $this->_doConnect($url);
  87. $this->disconnect[] = null; // If the previous succeeded
  88. } catch (Exception $e) {
  89. // s'okay, we'll live
  90. $this->stomps[] = null;
  91. $this->disconnect[] = time();
  92. }
  93. $this->transactionCount[] = 0;
  94. $this->transaction[] = null;
  95. }
  96. }
  97. // Done attempting connections
  98. if (empty($this->stomps)) {
  99. throw new ServerException("No STOMP queue servers reachable");
  100. } else {
  101. foreach ($this->stomps as $i => $st) {
  102. if ($st) {
  103. $this->subscribe($st);
  104. $this->begin($i);
  105. }
  106. }
  107. }
  108. }
  109. return true;
  110. }
  111. protected function _doConnect($server_url)
  112. {
  113. common_debug("STOMP: connecting to '{$server_url}' as '{$this->pl->username}'...");
  114. $cl = new Client($server_url);
  115. $cl->setLogin($this->pl->username, $this->pl->password);
  116. $cl->setVhostname($this->pl->vhost);
  117. try {
  118. $cl->connect();
  119. common_debug("STOMP connected.");
  120. } catch (StompException $e) {
  121. common_log(LOG_ERR, 'Failed to connect to STOMP queue server');
  122. throw new ServerException('Failed to connect to STOMP queue server');
  123. }
  124. return new StatefulStomp($cl);
  125. }
  126. /**
  127. * Grab a full list of stomp-side queue subscriptions.
  128. * Will include:
  129. * - control broadcast channel
  130. * - shared group queues for active groups
  131. * - per-handler and per-site breakouts that are rooted in the active groups.
  132. *
  133. * @return array of strings
  134. */
  135. protected function subscriptions(): array
  136. {
  137. $subs = [];
  138. $subs[] = $this->pl->control;
  139. foreach ($this->activeGroups as $group) {
  140. $subs[] = $this->pl->basename . $group;
  141. }
  142. foreach ($this->pl->breakout as $spec) {
  143. $parts = explode(':', $spec);
  144. if (count($parts) < 2 || count($parts) > 3) {
  145. common_log(LOG_ERR, "Bad queue breakout specifier '{$spec}'");
  146. }
  147. if (in_array($parts[0], $this->activeGroups)) {
  148. $subs[] = $this->pl->basename . $spec;
  149. }
  150. }
  151. return array_unique($subs);
  152. }
  153. /**
  154. * Set up all our raw queue subscriptions on the given connection
  155. * @param Client $st
  156. */
  157. protected function subscribe(StatefulStomp $st)
  158. {
  159. $this->_ensureConn();
  160. $host = $st->getClient()->getConnection()->getHost();
  161. foreach ($this->subscriptions() as $sub) {
  162. if (!in_array($sub, $this->subscriptions)) {
  163. $this->_log(LOG_INFO, "Subscribing to '{$sub}' on '{$host}'");
  164. try {
  165. $st->subscribe($sub);
  166. } catch (Exception $e) {
  167. common_log(LOG_ERR, "STOMP received exception: " . get_class($e) .
  168. " while trying to subscribe: " . $e->getMessage());
  169. throw $e;
  170. }
  171. $this->subscriptions[] = $sub;
  172. }
  173. }
  174. }
  175. protected function begin($idx)
  176. {
  177. if ($this->pl->useTransactions) {
  178. if (!empty($this->transaction[$idx])) {
  179. throw new Exception("Tried to start transaction in the middle of a transaction");
  180. }
  181. $this->transactionCount[$idx]++;
  182. $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time();
  183. $this->stomps[$idx]->begin($this->transaction[$idx]);
  184. }
  185. }
  186. protected function ack($idx, $frame)
  187. {
  188. if ($this->pl->useAcks) {
  189. if ($this->pl->useTransactions) {
  190. if (empty($this->transaction[$idx])) {
  191. throw new Exception("Tried to ack but not in a transaction");
  192. }
  193. $this->stomps[$idx]->ack($frame, $this->transaction[$idx]);
  194. } else {
  195. $this->stomps[$idx]->ack($frame);
  196. }
  197. }
  198. }
  199. protected function commit($idx)
  200. {
  201. if ($this->useTransactions) {
  202. if (empty($this->transaction[$idx])) {
  203. throw new Exception("Tried to commit but not in a transaction");
  204. }
  205. $this->stomps[$idx]->commit($this->transaction[$idx]);
  206. $this->transaction[$idx] = null;
  207. }
  208. }
  209. protected function rollback($idx)
  210. {
  211. if ($this->useTransactions) {
  212. if (empty($this->transaction[$idx])) {
  213. throw new Exception("Tried to rollback but not in a transaction");
  214. }
  215. $this->stomps[$idx]->abort($this->transaction[$idx]);
  216. $this->transaction[$idx] = null;
  217. }
  218. }
  219. /**
  220. * Saves an object into the queue item table.
  221. *
  222. * @param mixed $object
  223. * @param string $queue
  224. * @param string $siteNickname optional override to drop into another site's queue
  225. * @throws Exception
  226. */
  227. public function enqueue($object, $queue, $siteNickname = null)
  228. {
  229. $this->_ensureConn();
  230. $idx = $this->pl->defaultIdx;
  231. $rep = $this->logrep($object);
  232. $envelope = ['site' => $siteNickname ?: common_config('site', 'nickname'),
  233. 'handler' => $queue,
  234. 'payload' => $this->encode($object)];
  235. $msg = base64_encode(serialize($envelope));
  236. $props = ['created' => common_sql_now()];
  237. if ($this->isPersistent($queue)) {
  238. $props['persistent'] = 'true';
  239. }
  240. $st = $this->stomps[$idx];
  241. $host = $st->getClient()->getConnection()->getHost();
  242. $target = $this->queueName($queue);
  243. $result = $st->send($target, new Message($msg), $props);
  244. if (!$result) {
  245. common_log(LOG_ERR, "STOMP error sending $rep to $queue queue on $host $target");
  246. return false;
  247. }
  248. common_debug("STOMP complete remote queueing $rep for queue `$queue` on host `$host` on channel `$target`");
  249. $this->stats('enqueued', $queue);
  250. return true;
  251. }
  252. /**
  253. * Determine whether messages to this queue should be marked as persistent.
  254. * Actual persistent storage depends on the queue server's configuration.
  255. * @param string $queue
  256. * @return bool
  257. */
  258. protected function isPersistent($queue)
  259. {
  260. $mode = $this->pl->persistent;
  261. if (is_array($mode)) {
  262. return in_array($queue, $mode);
  263. } else {
  264. return (bool)$mode;
  265. }
  266. }
  267. /**
  268. * Poll every 10 seconds for new events during idle periods.
  269. * We'll look in more often when there's data available.
  270. * Must be greater than 0 for the poll method to be called
  271. *
  272. * @return int seconds
  273. */
  274. public function pollInterval()
  275. {
  276. return 10;
  277. }
  278. /**
  279. * Poll a queue and Handle an event
  280. *
  281. * If the queue handler reports failure, the message is requeued for later.
  282. * Missing notices or handler classes will drop the message.
  283. *
  284. * Side effects: in multi-site mode, may reset site configuration to
  285. * match the site that queued the event.
  286. *
  287. * @return bool success
  288. * @throws ConfigException
  289. * @throws NoConfigException
  290. * @throws ServerException
  291. * @throws StompException
  292. */
  293. public function poll(): bool
  294. {
  295. $this->_ensureConn();
  296. $frame = $this->stomps[$this->pl->defaultIdx]->getClient()->readFrame();
  297. if (empty($frame)) {
  298. return false;
  299. }
  300. $host = $this->stomps[$this->pl->defaultIdx]->getHost();
  301. $message = unserialize(base64_decode($frame->body));
  302. if ($message === false) {
  303. common_log(LOG_ERR, "STOMP can't unserialize frame: {$frame->body}\n" .
  304. 'Unserializable frame length: ' . strlen($frame->body));
  305. return false;
  306. }
  307. $site = $message['site'];
  308. $queue = $message['handler'];
  309. if ($this->isDeadLetter($frame, $message)) {
  310. $this->stats('deadletter', $queue);
  311. return false;
  312. }
  313. // @fixme detect failing site switches
  314. $this->switchSite($site);
  315. try {
  316. $item = $this->decode($message['payload']);
  317. } catch (Exception $e) {
  318. common_log(LOG_ERR, "Skipping empty or deleted item in queue {$queue} from {$host}");
  319. $this->stats('baditem', $queue);
  320. return false;
  321. }
  322. $info = $this->logrep($item) . ' posted at ' .
  323. $frame->getHeaders()['created'] . " in queue {$queue} from {$host}";
  324. try {
  325. $handler = $this->getHandler($queue);
  326. $ok = $handler->handle($item);
  327. } catch (NoQueueHandlerException $e) {
  328. common_log(LOG_ERR, "Missing handler class; skipping $info");
  329. $this->stats('badhandler', $queue);
  330. return false;
  331. } catch (Exception $e) {
  332. common_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage());
  333. $ok = false;
  334. }
  335. if ($ok) {
  336. common_log(LOG_INFO, "Successfully handled $info");
  337. $this->stats('handled', $queue);
  338. } else {
  339. common_log(LOG_WARNING, "Failed handling $info");
  340. // Requeing moves the item to the end of the line for its next try.
  341. // @fixme add a manual retry count
  342. $this->enqueue($item, $queue);
  343. $this->stats('requeued', $queue);
  344. }
  345. return false;
  346. }
  347. /**
  348. * Check if a redelivered message has been run through enough
  349. * that we're going to give up on it.
  350. *
  351. * @param Frame $frame
  352. * @param array $message unserialized message body
  353. * @return bool true if we should discard
  354. */
  355. protected function isDeadLetter($frame, $message)
  356. {
  357. if (isset($frame->getHeaders()['redelivered']) && $frame->getHeaders()['redelivered'] == 'true') {
  358. // Message was redelivered, possibly indicating a previous failure.
  359. $msgId = $frame->getHeaders()['message-id'];
  360. $site = $message['site'];
  361. $queue = $message['handler'];
  362. $msgInfo = "message $msgId for $site in queue $queue";
  363. $deliveries = $this->incDeliveryCount($msgId);
  364. if ($deliveries > common_config('queue', 'max_retries')) {
  365. $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo";
  366. $outdir = common_config('queue', 'dead_letter_dir');
  367. if ($outdir) {
  368. $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId);
  369. $info .= ": dumping to $filename";
  370. file_put_contents($filename, $message['payload']);
  371. }
  372. common_log(LOG_ERR, $info);
  373. return true;
  374. } else {
  375. common_log(LOG_INFO, "retry $deliveries on $msgInfo");
  376. }
  377. }
  378. return false;
  379. }
  380. /**
  381. * Update count of times we've re-encountered this message recently,
  382. * triggered when we get a message marked as 'redelivered'.
  383. *
  384. * Requires a CLI-friendly cache configuration.
  385. *
  386. * @param string $msgId message-id header from message
  387. * @return int number of retries recorded
  388. */
  389. function incDeliveryCount($msgId)
  390. {
  391. $count = 0;
  392. $cache = Cache::instance();
  393. if ($cache) {
  394. $key = 'gnusocial:stomp:message-retries:' . $msgId;
  395. $count = $cache->increment($key);
  396. if (!$count) {
  397. $count = 1;
  398. $cache->set($key, $count, null, 3600);
  399. }
  400. }
  401. return $count;
  402. }
  403. /**
  404. * Combines the queue_basename from configuration with the
  405. * group name for this queue to give eg:
  406. *
  407. * /queue/statusnet/main
  408. * /queue/statusnet/main/distrib
  409. * /queue/statusnet/xmpp/xmppout/site01
  410. *
  411. * @param string $queue
  412. * @return string
  413. * @throws Exception
  414. */
  415. protected function queueName(string $queue): string
  416. {
  417. $group = $this->queueGroup($queue);
  418. $site = GNUsocial::currentSite();
  419. foreach (["$group:$queue:$site", "$group:$queue"] as $spec) {
  420. if (in_array($spec, $this->breakout)) {
  421. return $this->pl->basename . $spec;
  422. }
  423. }
  424. return $this->pl->basename . $group;
  425. }
  426. /**
  427. * Get the breakout mode for the given queue on the current site.
  428. *
  429. * @param string $queue
  430. * @return string one of 'shared', 'handler', 'site'
  431. */
  432. protected function breakoutMode($queue)
  433. {
  434. if (isset($this->pl->breakout[$queue])) {
  435. return $this->pl->breakout[$queue];
  436. } else if (isset($this->pl->breakout['*'])) {
  437. return $this->pl->breakout['*'];
  438. } else {
  439. return 'shared';
  440. }
  441. }
  442. /**
  443. * Tell the i/o master we only need a single instance to cover
  444. * all sites running in this process.
  445. */
  446. public static function multiSite()
  447. {
  448. return IoManager::INSTANCE_PER_PROCESS;
  449. }
  450. /**
  451. * Optional; ping any running queue handler daemons with a notification
  452. * such as announcing a new site to handle or requesting clean shutdown.
  453. * This avoids having to restart all the daemons manually to update configs
  454. * and such.
  455. *
  456. * Currently only relevant for multi-site queue managers such as Stomp.
  457. *
  458. * @param string $event event key
  459. * @param string $param optional parameter to append to key
  460. * @return bool success
  461. */
  462. public function sendControlSignal($event, $param = '')
  463. {
  464. $message = $event;
  465. if ($param != '') {
  466. $message .= ':' . $param;
  467. }
  468. $this->_ensureConn();
  469. $st = $this->stomps[$this->pl->defaultIdx];
  470. $result = $st->send($this->pl->control, $message, ['created' => common_sql_now()]);
  471. if ($result) {
  472. common_log(LOG_INFO, "Sent control ping to STOMP queue daemons: $message");
  473. return true;
  474. } else {
  475. common_log(LOG_ERR, "Failed sending control ping to STOMP queue daemons: $message");
  476. return false;
  477. }
  478. }
  479. /**
  480. * Process a control signal broadcast.
  481. *
  482. * @param int $idx connection index
  483. * @param array $frame Stomp frame
  484. * @return bool true to continue; false to stop further processing.
  485. * @throws ConfigException
  486. * @throws NoConfigException
  487. * @throws ServerException
  488. */
  489. protected function handleControlSignal(int $idx, $frame): bool
  490. {
  491. $message = trim($frame->body);
  492. if (strpos($message, ':') !== false) {
  493. list($event, $param) = explode(':', $message, 2);
  494. } else {
  495. $event = $message;
  496. $param = '';
  497. }
  498. $shutdown = false;
  499. if ($event == 'shutdown') {
  500. $this->master->requestShutdown();
  501. $shutdown = true;
  502. } else if ($event == 'restart') {
  503. $this->master->requestRestart();
  504. $shutdown = true;
  505. } else if ($event == 'update') {
  506. $this->updateSiteConfig($param);
  507. } else {
  508. common_log(LOG_ERR, "Ignoring unrecognized control message: $message");
  509. }
  510. return $shutdown;
  511. }
  512. /**
  513. * Switch site, if necessary, and reset current handler assignments
  514. * @param string $site
  515. * @throws ConfigException
  516. * @throws NoConfigException
  517. * @throws ServerException
  518. */
  519. private function switchSite(string $site): void
  520. {
  521. if ($site != GNUsocial::currentSite()) {
  522. $this->stats('switch');
  523. GNUsocial::switchSite($site);
  524. $this->initialize();
  525. }
  526. }
  527. /**
  528. * (Re)load runtime configuration for a given site by nickname,
  529. * triggered by a broadcast to the 'statusnet-control' topic.
  530. *
  531. * Configuration changes in database should update, but config
  532. * files might not.
  533. *
  534. * @param $nickname
  535. * @return bool true to continue; false to stop further processing.
  536. * @throws ConfigException
  537. * @throws NoConfigException
  538. * @throws ServerException
  539. */
  540. protected function updateSiteConfig($nickname): bool
  541. {
  542. $sn = Status_network::getKV('nickname', $nickname);
  543. if ($sn) {
  544. $this->switchSite($nickname);
  545. if (!in_array($nickname, $this->sites)) {
  546. $this->addSite();
  547. }
  548. $this->stats('siteupdate');
  549. } else {
  550. common_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
  551. }
  552. }
  553. };