stasis.c 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Stasis Message Bus API.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/stasis_internal.h"
  31. #include "asterisk/stasis.h"
  32. #include "asterisk/taskprocessor.h"
  33. #include "asterisk/utils.h"
  34. #include "asterisk/uuid.h"
  35. #include "asterisk/vector.h"
  36. /*!
  37. * \page stasis-impl Stasis Implementation Notes
  38. *
  39. * \par Reference counting
  40. *
  41. * Stasis introduces a number of objects, which are tightly related to one
  42. * another. Because we rely on ref-counting for memory management, understanding
  43. * these relationships is important to understanding this code.
  44. *
  45. * \code{.txt}
  46. *
  47. * stasis_topic <----> stasis_subscription
  48. * ^ ^
  49. * \ /
  50. * \ /
  51. * dispatch
  52. * |
  53. * |
  54. * v
  55. * stasis_message
  56. * |
  57. * |
  58. * v
  59. * stasis_message_type
  60. *
  61. * \endcode
  62. *
  63. * The most troubling thing in this chart is the cyclic reference between
  64. * stasis_topic and stasis_subscription. This is both unfortunate, and
  65. * necessary. Topics need the subscription in order to dispatch messages;
  66. * subscriptions need the topics to unsubscribe and check subscription status.
  67. *
  68. * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
  69. * topic's reference to a subscription. When the subcription is destroyed, it
  70. * will remove its reference to the topic.
  71. *
  72. * This means that until a subscription has be explicitly unsubscribed, it will
  73. * not be destroyed. Neither will a topic be destroyed while it has subscribers.
  74. * The destructors of both have assertions regarding this to catch ref-counting
  75. * problems where a subscription or topic has had an extra ao2_cleanup().
  76. *
  77. * The \ref dispatch object is a transient object, which is posted to a
  78. * subscription's taskprocessor to send a message to the subscriber. They have
  79. * short life cycles, allocated on one thread, destroyed on another.
  80. *
  81. * During shutdown, or the deletion of a domain object, there are a flurry of
  82. * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
  83. * are processed. Any one of these cleanups could be the one to actually destroy
  84. * a given object, so care must be taken to ensure that an object isn't
  85. * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
  86. * that might happen when a RAII_VAR() goes out of scope.
  87. *
  88. * \par Typical life cycles
  89. *
  90. * \li stasis_topic - There are several topics which live for the duration of
  91. * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
  92. * are actually fed by shorter-lived topics whose lifetime is associated
  93. * with some domain object (like ast_channel_topic() for a given
  94. * ast_channel).
  95. *
  96. * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
  97. * topics, for similar reasons.
  98. *
  99. * \li dispatch - Very short lived; just long enough to post a message to a
  100. * subscriber.
  101. *
  102. * \li stasis_message - Short to intermediate lifetimes, but that is mostly
  103. * irrelevant. Messages are strictly data and have no behavior associated
  104. * with them, so it doesn't really matter if/when they are destroyed. By
  105. * design, a component could hold a ref to a message forever without any
  106. * ill consequences (aside from consuming more memory).
  107. *
  108. * \li stasis_message_type - Long life cycles, typically only destroyed on
  109. * module unloading or _clean_ process exit.
  110. *
  111. * \par Subscriber shutdown sequencing
  112. *
  113. * Subscribers are sensitive to shutdown sequencing, specifically in how the
  114. * reference message types. This is fully detailed on the wiki at
  115. * https://wiki.asterisk.org/wiki/x/K4BqAQ.
  116. *
  117. * In short, the lifetime of the \a data (and \a callback, if in a module) must
  118. * be held until the stasis_subscription_final_message() has been received.
  119. * Depending on the structure of the subscriber code, this can be handled by
  120. * using stasis_subscription_final_message() to free resources on the final
  121. * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
  122. * block until the unsubscribe has completed.
  123. */
  124. /*! Initial size of the subscribers list. */
  125. #define INITIAL_SUBSCRIBERS_MAX 4
  126. /*! The number of buckets to use for topic pools */
  127. #define TOPIC_POOL_BUCKETS 57
  128. STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
  129. /*! \internal */
  130. struct stasis_topic {
  131. char *name;
  132. /*! Variable length array of the subscribers */
  133. AST_VECTOR(, struct stasis_subscription *) subscribers;
  134. /*! Topics forwarding into this topic */
  135. AST_VECTOR(, struct stasis_topic *) upstream_topics;
  136. };
  137. /* Forward declarations for the tightly-coupled subscription object */
  138. static int topic_add_subscription(struct stasis_topic *topic,
  139. struct stasis_subscription *sub);
  140. static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
  141. /*! \brief Lock two topics. */
  142. #define topic_lock_both(topic1, topic2) \
  143. do { \
  144. ao2_lock(topic1); \
  145. while (ao2_trylock(topic2)) { \
  146. AO2_DEADLOCK_AVOIDANCE(topic1); \
  147. } \
  148. } while (0)
  149. static void topic_dtor(void *obj)
  150. {
  151. struct stasis_topic *topic = obj;
  152. /* Subscribers hold a reference to topics, so they should all be
  153. * unsubscribed before we get here. */
  154. ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
  155. ast_free(topic->name);
  156. topic->name = NULL;
  157. AST_VECTOR_FREE(&topic->subscribers);
  158. AST_VECTOR_FREE(&topic->upstream_topics);
  159. }
  160. struct stasis_topic *stasis_topic_create(const char *name)
  161. {
  162. struct stasis_topic *topic;
  163. int res = 0;
  164. topic = ao2_alloc(sizeof(*topic), topic_dtor);
  165. if (!topic) {
  166. return NULL;
  167. }
  168. topic->name = ast_strdup(name);
  169. res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
  170. res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
  171. if (!topic->name || res) {
  172. ao2_cleanup(topic);
  173. return NULL;
  174. }
  175. return topic;
  176. }
  177. const char *stasis_topic_name(const struct stasis_topic *topic)
  178. {
  179. return topic->name;
  180. }
  181. /*! \internal */
  182. struct stasis_subscription {
  183. /*! Unique ID for this subscription */
  184. char uniqueid[AST_UUID_STR_LEN];
  185. /*! Topic subscribed to. */
  186. struct stasis_topic *topic;
  187. /*! Mailbox for processing incoming messages. */
  188. struct ast_taskprocessor *mailbox;
  189. /*! Callback function for incoming message processing. */
  190. stasis_subscription_cb callback;
  191. /*! Data pointer to be handed to the callback. */
  192. void *data;
  193. /*! Condition for joining with subscription. */
  194. ast_cond_t join_cond;
  195. /*! Flag set when final message for sub has been received.
  196. * Be sure join_lock is held before reading/setting. */
  197. int final_message_rxed;
  198. /*! Flag set when final message for sub has been processed.
  199. * Be sure join_lock is held before reading/setting. */
  200. int final_message_processed;
  201. };
  202. static void subscription_dtor(void *obj)
  203. {
  204. struct stasis_subscription *sub = obj;
  205. /* Subscriptions need to be manually unsubscribed before destruction
  206. * b/c there's a cyclic reference between topics and subscriptions */
  207. ast_assert(!stasis_subscription_is_subscribed(sub));
  208. /* If there are any messages in flight to this subscription; that would
  209. * be bad. */
  210. ast_assert(stasis_subscription_is_done(sub));
  211. ao2_cleanup(sub->topic);
  212. sub->topic = NULL;
  213. ast_taskprocessor_unreference(sub->mailbox);
  214. sub->mailbox = NULL;
  215. ast_cond_destroy(&sub->join_cond);
  216. }
  217. /*!
  218. * \brief Invoke the subscription's callback.
  219. * \param sub Subscription to invoke.
  220. * \param topic Topic message was published to.
  221. * \param message Message to send.
  222. */
  223. static void subscription_invoke(struct stasis_subscription *sub,
  224. struct stasis_message *message)
  225. {
  226. /* Notify that the final message has been received */
  227. if (stasis_subscription_final_message(sub, message)) {
  228. SCOPED_AO2LOCK(lock, sub);
  229. sub->final_message_rxed = 1;
  230. ast_cond_signal(&sub->join_cond);
  231. }
  232. /* Since sub is mostly immutable, no need to lock sub */
  233. sub->callback(sub->data, sub, message);
  234. /* Notify that the final message has been processed */
  235. if (stasis_subscription_final_message(sub, message)) {
  236. SCOPED_AO2LOCK(lock, sub);
  237. sub->final_message_processed = 1;
  238. ast_cond_signal(&sub->join_cond);
  239. }
  240. }
  241. static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
  242. static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
  243. struct stasis_subscription *internal_stasis_subscribe(
  244. struct stasis_topic *topic,
  245. stasis_subscription_cb callback,
  246. void *data,
  247. int needs_mailbox)
  248. {
  249. RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
  250. if (!topic) {
  251. return NULL;
  252. }
  253. /* The ao2 lock is used for join_cond. */
  254. sub = ao2_alloc(sizeof(*sub), subscription_dtor);
  255. if (!sub) {
  256. return NULL;
  257. }
  258. ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
  259. if (needs_mailbox) {
  260. /* With a small number of subscribers, a thread-per-sub is
  261. * acceptable. If our usage changes so that we have larger
  262. * numbers of subscribers, we'll probably want to consider
  263. * a threadpool. We had that originally, but with so few
  264. * subscribers it was actually a performance loss instead of
  265. * a gain.
  266. */
  267. sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
  268. TPS_REF_DEFAULT);
  269. if (!sub->mailbox) {
  270. return NULL;
  271. }
  272. ast_taskprocessor_set_local(sub->mailbox, sub);
  273. /* Taskprocessor has a reference */
  274. ao2_ref(sub, +1);
  275. }
  276. ao2_ref(topic, +1);
  277. sub->topic = topic;
  278. sub->callback = callback;
  279. sub->data = data;
  280. ast_cond_init(&sub->join_cond, NULL);
  281. if (topic_add_subscription(topic, sub) != 0) {
  282. return NULL;
  283. }
  284. send_subscription_subscribe(topic, sub);
  285. ao2_ref(sub, +1);
  286. return sub;
  287. }
  288. struct stasis_subscription *stasis_subscribe(
  289. struct stasis_topic *topic,
  290. stasis_subscription_cb callback,
  291. void *data)
  292. {
  293. return internal_stasis_subscribe(topic, callback, data, 1);
  294. }
  295. static int sub_cleanup(void *data)
  296. {
  297. struct stasis_subscription *sub = data;
  298. ao2_cleanup(sub);
  299. return 0;
  300. }
  301. struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
  302. {
  303. /* The subscription may be the last ref to this topic. Hold
  304. * the topic ref open until after the unlock. */
  305. RAII_VAR(struct stasis_topic *, topic,
  306. ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
  307. if (!sub) {
  308. return NULL;
  309. }
  310. /* We have to remove the subscription first, to ensure the unsubscribe
  311. * is the final message */
  312. if (topic_remove_subscription(sub->topic, sub) != 0) {
  313. ast_log(LOG_ERROR,
  314. "Internal error: subscription has invalid topic\n");
  315. return NULL;
  316. }
  317. /* Now let everyone know about the unsubscribe */
  318. send_subscription_unsubscribe(topic, sub);
  319. /* When all that's done, remove the ref the mailbox has on the sub */
  320. if (sub->mailbox) {
  321. ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
  322. }
  323. /* Unsubscribing unrefs the subscription */
  324. ao2_cleanup(sub);
  325. return NULL;
  326. }
  327. void stasis_subscription_join(struct stasis_subscription *subscription)
  328. {
  329. if (subscription) {
  330. SCOPED_AO2LOCK(lock, subscription);
  331. /* Wait until the processed flag has been set */
  332. while (!subscription->final_message_processed) {
  333. ast_cond_wait(&subscription->join_cond,
  334. ao2_object_get_lockaddr(subscription));
  335. }
  336. }
  337. }
  338. int stasis_subscription_is_done(struct stasis_subscription *subscription)
  339. {
  340. if (subscription) {
  341. SCOPED_AO2LOCK(lock, subscription);
  342. return subscription->final_message_rxed;
  343. }
  344. /* Null subscription is about as done as you can get */
  345. return 1;
  346. }
  347. struct stasis_subscription *stasis_unsubscribe_and_join(
  348. struct stasis_subscription *subscription)
  349. {
  350. if (!subscription) {
  351. return NULL;
  352. }
  353. /* Bump refcount to hold it past the unsubscribe */
  354. ao2_ref(subscription, +1);
  355. stasis_unsubscribe(subscription);
  356. stasis_subscription_join(subscription);
  357. /* Now decrement the refcount back */
  358. ao2_cleanup(subscription);
  359. return NULL;
  360. }
  361. int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
  362. {
  363. if (sub) {
  364. size_t i;
  365. struct stasis_topic *topic = sub->topic;
  366. SCOPED_AO2LOCK(lock_topic, topic);
  367. for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
  368. if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
  369. return 1;
  370. }
  371. }
  372. }
  373. return 0;
  374. }
  375. const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
  376. {
  377. return sub->uniqueid;
  378. }
  379. int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
  380. {
  381. struct stasis_subscription_change *change;
  382. if (stasis_message_type(msg) != stasis_subscription_change_type()) {
  383. return 0;
  384. }
  385. change = stasis_message_data(msg);
  386. if (strcmp("Unsubscribe", change->description)) {
  387. return 0;
  388. }
  389. if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
  390. return 0;
  391. }
  392. return 1;
  393. }
  394. /*!
  395. * \brief Add a subscriber to a topic.
  396. * \param topic Topic
  397. * \param sub Subscriber
  398. * \return 0 on success
  399. * \return Non-zero on error
  400. */
  401. static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
  402. {
  403. size_t idx;
  404. SCOPED_AO2LOCK(lock, topic);
  405. /* The reference from the topic to the subscription is shared with
  406. * the owner of the subscription, which will explicitly unsubscribe
  407. * to release it.
  408. *
  409. * If we bumped the refcount here, the owner would have to unsubscribe
  410. * and cleanup, which is a bit awkward. */
  411. AST_VECTOR_APPEND(&topic->subscribers, sub);
  412. for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
  413. topic_add_subscription(
  414. AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
  415. }
  416. return 0;
  417. }
  418. static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
  419. {
  420. size_t idx;
  421. SCOPED_AO2LOCK(lock_topic, topic);
  422. for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
  423. topic_remove_subscription(
  424. AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
  425. }
  426. return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
  427. AST_VECTOR_ELEM_CLEANUP_NOOP);
  428. }
  429. /*!
  430. * \internal \brief Dispatch a message to a subscriber asynchronously
  431. * \param local \ref ast_taskprocessor_local object
  432. * \return 0
  433. */
  434. static int dispatch_exec_async(struct ast_taskprocessor_local *local)
  435. {
  436. struct stasis_subscription *sub = local->local_data;
  437. struct stasis_message *message = local->data;
  438. subscription_invoke(sub, message);
  439. ao2_cleanup(message);
  440. return 0;
  441. }
  442. /*!
  443. * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
  444. * a published message to a subscriber
  445. */
  446. struct sync_task_data {
  447. ast_mutex_t lock;
  448. ast_cond_t cond;
  449. int complete;
  450. void *task_data;
  451. };
  452. /*!
  453. * \internal \brief Dispatch a message to a subscriber synchronously
  454. * \param local \ref ast_taskprocessor_local object
  455. * \return 0
  456. */
  457. static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
  458. {
  459. struct stasis_subscription *sub = local->local_data;
  460. struct sync_task_data *std = local->data;
  461. struct stasis_message *message = std->task_data;
  462. subscription_invoke(sub, message);
  463. ao2_cleanup(message);
  464. ast_mutex_lock(&std->lock);
  465. std->complete = 1;
  466. ast_cond_signal(&std->cond);
  467. ast_mutex_unlock(&std->lock);
  468. return 0;
  469. }
  470. /*!
  471. * \internal \brief Dispatch a message to a subscriber
  472. * \param sub The subscriber to dispatch to
  473. * \param message The message to send
  474. * \param synchronous If non-zero, synchronize on the subscriber receiving
  475. * the message
  476. */
  477. static void dispatch_message(struct stasis_subscription *sub,
  478. struct stasis_message *message,
  479. int synchronous)
  480. {
  481. if (!sub->mailbox) {
  482. /* Dispatch directly */
  483. subscription_invoke(sub, message);
  484. return;
  485. }
  486. /* Bump the message for the taskprocessor push. This will get de-ref'd
  487. * by the task processor callback.
  488. */
  489. ao2_bump(message);
  490. if (!synchronous) {
  491. if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
  492. /* Push failed; ugh. */
  493. ast_log(LOG_ERROR, "Dropping async dispatch\n");
  494. ao2_cleanup(message);
  495. }
  496. } else {
  497. struct sync_task_data std;
  498. ast_mutex_init(&std.lock);
  499. ast_cond_init(&std.cond, NULL);
  500. std.complete = 0;
  501. std.task_data = message;
  502. if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
  503. /* Push failed; ugh. */
  504. ast_log(LOG_ERROR, "Dropping sync dispatch\n");
  505. ao2_cleanup(message);
  506. ast_mutex_destroy(&std.lock);
  507. ast_cond_destroy(&std.cond);
  508. return;
  509. }
  510. ast_mutex_lock(&std.lock);
  511. while (!std.complete) {
  512. ast_cond_wait(&std.cond, &std.lock);
  513. }
  514. ast_mutex_unlock(&std.lock);
  515. ast_mutex_destroy(&std.lock);
  516. ast_cond_destroy(&std.cond);
  517. }
  518. }
  519. /*!
  520. * \internal \brief Publish a message to a topic's subscribers
  521. * \brief topic The topic to publish to
  522. * \brief message The message to publish
  523. * \brief sync_sub An optional subscriber of the topic to publish synchronously
  524. * to
  525. */
  526. static void publish_msg(struct stasis_topic *topic,
  527. struct stasis_message *message, struct stasis_subscription *sync_sub)
  528. {
  529. size_t i;
  530. ast_assert(topic != NULL);
  531. ast_assert(message != NULL);
  532. /*
  533. * The topic may be unref'ed by the subscription invocation.
  534. * Make sure we hold onto a reference while dispatching.
  535. */
  536. ao2_ref(topic, +1);
  537. ao2_lock(topic);
  538. for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
  539. struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
  540. ast_assert(sub != NULL);
  541. dispatch_message(sub, message, (sub == sync_sub));
  542. }
  543. ao2_unlock(topic);
  544. ao2_ref(topic, -1);
  545. }
  546. void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
  547. {
  548. publish_msg(topic, message, NULL);
  549. }
  550. void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
  551. {
  552. ast_assert(sub != NULL);
  553. publish_msg(sub->topic, message, sub);
  554. }
  555. /*!
  556. * \brief Forwarding information
  557. *
  558. * Any message posted to \a from_topic is forwarded to \a to_topic.
  559. *
  560. * In cases where both the \a from_topic and \a to_topic need to be locked,
  561. * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
  562. */
  563. struct stasis_forward {
  564. /*! Originating topic */
  565. struct stasis_topic *from_topic;
  566. /*! Destination topic */
  567. struct stasis_topic *to_topic;
  568. };
  569. static void forward_dtor(void *obj)
  570. {
  571. struct stasis_forward *forward = obj;
  572. ao2_cleanup(forward->from_topic);
  573. forward->from_topic = NULL;
  574. ao2_cleanup(forward->to_topic);
  575. forward->to_topic = NULL;
  576. }
  577. struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
  578. {
  579. int idx;
  580. struct stasis_topic *from;
  581. struct stasis_topic *to;
  582. if (!forward) {
  583. return NULL;
  584. }
  585. from = forward->from_topic;
  586. to = forward->to_topic;
  587. topic_lock_both(to, from);
  588. AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
  589. AST_VECTOR_ELEM_CLEANUP_NOOP);
  590. for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
  591. topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
  592. }
  593. ao2_unlock(from);
  594. ao2_unlock(to);
  595. ao2_cleanup(forward);
  596. return NULL;
  597. }
  598. struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
  599. struct stasis_topic *to_topic)
  600. {
  601. int res;
  602. size_t idx;
  603. RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
  604. if (!from_topic || !to_topic) {
  605. return NULL;
  606. }
  607. forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
  608. if (!forward) {
  609. return NULL;
  610. }
  611. forward->from_topic = ao2_bump(from_topic);
  612. forward->to_topic = ao2_bump(to_topic);
  613. topic_lock_both(to_topic, from_topic);
  614. res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
  615. if (res != 0) {
  616. ao2_unlock(from_topic);
  617. ao2_unlock(to_topic);
  618. return NULL;
  619. }
  620. for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
  621. topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
  622. }
  623. ao2_unlock(from_topic);
  624. ao2_unlock(to_topic);
  625. return ao2_bump(forward);
  626. }
  627. static void subscription_change_dtor(void *obj)
  628. {
  629. struct stasis_subscription_change *change = obj;
  630. ast_string_field_free_memory(change);
  631. ao2_cleanup(change->topic);
  632. }
  633. static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
  634. {
  635. struct stasis_subscription_change *change;
  636. change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
  637. if (!change || ast_string_field_init(change, 128)) {
  638. ao2_cleanup(change);
  639. return NULL;
  640. }
  641. ast_string_field_set(change, uniqueid, uniqueid);
  642. ast_string_field_set(change, description, description);
  643. ao2_ref(topic, +1);
  644. change->topic = topic;
  645. return change;
  646. }
  647. static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
  648. {
  649. struct stasis_subscription_change *change;
  650. struct stasis_message *msg;
  651. /* This assumes that we have already unsubscribed */
  652. ast_assert(stasis_subscription_is_subscribed(sub));
  653. change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
  654. if (!change) {
  655. return;
  656. }
  657. msg = stasis_message_create(stasis_subscription_change_type(), change);
  658. if (!msg) {
  659. ao2_cleanup(change);
  660. return;
  661. }
  662. stasis_publish(topic, msg);
  663. ao2_cleanup(msg);
  664. ao2_cleanup(change);
  665. }
  666. static void send_subscription_unsubscribe(struct stasis_topic *topic,
  667. struct stasis_subscription *sub)
  668. {
  669. struct stasis_subscription_change *change;
  670. struct stasis_message *msg;
  671. /* This assumes that we have already unsubscribed */
  672. ast_assert(!stasis_subscription_is_subscribed(sub));
  673. change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
  674. if (!change) {
  675. return;
  676. }
  677. msg = stasis_message_create(stasis_subscription_change_type(), change);
  678. if (!msg) {
  679. ao2_cleanup(change);
  680. return;
  681. }
  682. stasis_publish(topic, msg);
  683. /* Now we have to dispatch to the subscription itself */
  684. dispatch_message(sub, msg, 0);
  685. ao2_cleanup(msg);
  686. ao2_cleanup(change);
  687. }
  688. struct topic_pool_entry {
  689. struct stasis_forward *forward;
  690. struct stasis_topic *topic;
  691. };
  692. static void topic_pool_entry_dtor(void *obj)
  693. {
  694. struct topic_pool_entry *entry = obj;
  695. entry->forward = stasis_forward_cancel(entry->forward);
  696. ao2_cleanup(entry->topic);
  697. entry->topic = NULL;
  698. }
  699. static struct topic_pool_entry *topic_pool_entry_alloc(void)
  700. {
  701. return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
  702. AO2_ALLOC_OPT_LOCK_NOLOCK);
  703. }
  704. struct stasis_topic_pool {
  705. struct ao2_container *pool_container;
  706. struct stasis_topic *pool_topic;
  707. };
  708. static void topic_pool_dtor(void *obj)
  709. {
  710. struct stasis_topic_pool *pool = obj;
  711. ao2_cleanup(pool->pool_container);
  712. pool->pool_container = NULL;
  713. ao2_cleanup(pool->pool_topic);
  714. pool->pool_topic = NULL;
  715. }
  716. static int topic_pool_entry_hash(const void *obj, const int flags)
  717. {
  718. const struct topic_pool_entry *object;
  719. const char *key;
  720. switch (flags & OBJ_SEARCH_MASK) {
  721. case OBJ_SEARCH_KEY:
  722. key = obj;
  723. break;
  724. case OBJ_SEARCH_OBJECT:
  725. object = obj;
  726. key = stasis_topic_name(object->topic);
  727. break;
  728. default:
  729. /* Hash can only work on something with a full key. */
  730. ast_assert(0);
  731. return 0;
  732. }
  733. return ast_str_case_hash(key);
  734. }
  735. static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
  736. {
  737. const struct topic_pool_entry *object_left = obj;
  738. const struct topic_pool_entry *object_right = arg;
  739. const char *right_key = arg;
  740. int cmp;
  741. switch (flags & OBJ_SEARCH_MASK) {
  742. case OBJ_SEARCH_OBJECT:
  743. right_key = stasis_topic_name(object_right->topic);
  744. /* Fall through */
  745. case OBJ_SEARCH_KEY:
  746. cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
  747. break;
  748. case OBJ_SEARCH_PARTIAL_KEY:
  749. /* Not supported by container */
  750. ast_assert(0);
  751. cmp = -1;
  752. break;
  753. default:
  754. /*
  755. * What arg points to is specific to this traversal callback
  756. * and has no special meaning to astobj2.
  757. */
  758. cmp = 0;
  759. break;
  760. }
  761. if (cmp) {
  762. return 0;
  763. }
  764. /*
  765. * At this point the traversal callback is identical to a sorted
  766. * container.
  767. */
  768. return CMP_MATCH;
  769. }
  770. struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
  771. {
  772. struct stasis_topic_pool *pool;
  773. pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
  774. if (!pool) {
  775. return NULL;
  776. }
  777. pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
  778. topic_pool_entry_hash, topic_pool_entry_cmp);
  779. if (!pool->pool_container) {
  780. ao2_cleanup(pool);
  781. return NULL;
  782. }
  783. ao2_ref(pooled_topic, +1);
  784. pool->pool_topic = pooled_topic;
  785. return pool;
  786. }
  787. struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
  788. {
  789. RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
  790. SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
  791. topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
  792. if (topic_pool_entry) {
  793. return topic_pool_entry->topic;
  794. }
  795. topic_pool_entry = topic_pool_entry_alloc();
  796. if (!topic_pool_entry) {
  797. return NULL;
  798. }
  799. topic_pool_entry->topic = stasis_topic_create(topic_name);
  800. if (!topic_pool_entry->topic) {
  801. return NULL;
  802. }
  803. topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
  804. if (!topic_pool_entry->forward) {
  805. return NULL;
  806. }
  807. if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
  808. return NULL;
  809. }
  810. return topic_pool_entry->topic;
  811. }
  812. void stasis_log_bad_type_access(const char *name)
  813. {
  814. ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
  815. }
  816. /*! \brief Cleanup function for graceful shutdowns */
  817. static void stasis_cleanup(void)
  818. {
  819. STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
  820. }
  821. int stasis_init(void)
  822. {
  823. int cache_init;
  824. /* Be sure the types are cleaned up after the message bus */
  825. ast_register_cleanup(stasis_cleanup);
  826. cache_init = stasis_cache_init();
  827. if (cache_init != 0) {
  828. return -1;
  829. }
  830. if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
  831. return -1;
  832. }
  833. return 0;
  834. }