stasis_cache.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Stasis Message API.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/hashtab.h"
  31. #include "asterisk/stasis_internal.h"
  32. #include "asterisk/stasis.h"
  33. #include "asterisk/utils.h"
  34. #include "asterisk/vector.h"
  35. #ifdef LOW_MEMORY
  36. #define NUM_CACHE_BUCKETS 17
  37. #else
  38. #define NUM_CACHE_BUCKETS 563
  39. #endif
  40. /*! \internal */
  41. struct stasis_cache {
  42. struct ao2_container *entries;
  43. snapshot_get_id id_fn;
  44. cache_aggregate_calc_fn aggregate_calc_fn;
  45. cache_aggregate_publish_fn aggregate_publish_fn;
  46. };
  47. /*! \internal */
  48. struct stasis_caching_topic {
  49. struct stasis_cache *cache;
  50. struct stasis_topic *topic;
  51. struct stasis_topic *original_topic;
  52. struct stasis_subscription *sub;
  53. };
  54. static void stasis_caching_topic_dtor(void *obj)
  55. {
  56. struct stasis_caching_topic *caching_topic = obj;
  57. /* Caching topics contain subscriptions, and must be manually
  58. * unsubscribed. */
  59. ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
  60. /* If there are any messages in flight to this subscription; that would
  61. * be bad. */
  62. ast_assert(stasis_subscription_is_done(caching_topic->sub));
  63. ao2_cleanup(caching_topic->sub);
  64. caching_topic->sub = NULL;
  65. ao2_cleanup(caching_topic->cache);
  66. caching_topic->cache = NULL;
  67. ao2_cleanup(caching_topic->topic);
  68. caching_topic->topic = NULL;
  69. ao2_cleanup(caching_topic->original_topic);
  70. caching_topic->original_topic = NULL;
  71. }
  72. struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
  73. {
  74. return caching_topic->topic;
  75. }
  76. struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
  77. {
  78. if (!caching_topic) {
  79. return NULL;
  80. }
  81. /*
  82. * The subscription may hold the last reference to this caching
  83. * topic, but we want to make sure the unsubscribe finishes
  84. * before kicking of the caching topic's dtor.
  85. */
  86. ao2_ref(caching_topic, +1);
  87. if (stasis_subscription_is_subscribed(caching_topic->sub)) {
  88. /*
  89. * Increment the reference to hold on to it past the
  90. * unsubscribe. Will be cleaned up in dtor.
  91. */
  92. ao2_ref(caching_topic->sub, +1);
  93. stasis_unsubscribe(caching_topic->sub);
  94. } else {
  95. ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
  96. }
  97. ao2_cleanup(caching_topic);
  98. return NULL;
  99. }
  100. struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
  101. {
  102. if (!caching_topic) {
  103. return NULL;
  104. }
  105. /* Hold a ref past the unsubscribe */
  106. ao2_ref(caching_topic, +1);
  107. stasis_caching_unsubscribe(caching_topic);
  108. stasis_subscription_join(caching_topic->sub);
  109. ao2_cleanup(caching_topic);
  110. return NULL;
  111. }
  112. /*!
  113. * \brief The key for an entry in the cache
  114. * \note The items in this struct must be immutable for the item in the cache
  115. */
  116. struct cache_entry_key {
  117. /*! The message type of the item stored in the cache */
  118. struct stasis_message_type *type;
  119. /*! The unique ID of the item stored in the cache */
  120. const char *id;
  121. /*! The hash, computed from \c type and \c id */
  122. unsigned int hash;
  123. };
  124. struct stasis_cache_entry {
  125. struct cache_entry_key key;
  126. /*! Aggregate snapshot of the stasis cache. */
  127. struct stasis_message *aggregate;
  128. /*! Local entity snapshot of the stasis event. */
  129. struct stasis_message *local;
  130. /*! Remote entity snapshots of the stasis event. */
  131. AST_VECTOR(, struct stasis_message *) remote;
  132. };
  133. static void cache_entry_dtor(void *obj)
  134. {
  135. struct stasis_cache_entry *entry = obj;
  136. size_t idx;
  137. ao2_cleanup(entry->key.type);
  138. entry->key.type = NULL;
  139. ast_free((char *) entry->key.id);
  140. entry->key.id = NULL;
  141. ao2_cleanup(entry->aggregate);
  142. entry->aggregate = NULL;
  143. ao2_cleanup(entry->local);
  144. entry->local = NULL;
  145. for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
  146. struct stasis_message *remote;
  147. remote = AST_VECTOR_GET(&entry->remote, idx);
  148. ao2_cleanup(remote);
  149. }
  150. AST_VECTOR_FREE(&entry->remote);
  151. }
  152. static void cache_entry_compute_hash(struct cache_entry_key *key)
  153. {
  154. key->hash = ast_hashtab_hash_string(stasis_message_type_name(key->type));
  155. key->hash += ast_hashtab_hash_string(key->id);
  156. }
  157. static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
  158. {
  159. struct stasis_cache_entry *entry;
  160. int is_remote;
  161. ast_assert(id != NULL);
  162. ast_assert(snapshot != NULL);
  163. if (!type) {
  164. return NULL;
  165. }
  166. entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
  167. AO2_ALLOC_OPT_LOCK_NOLOCK);
  168. if (!entry) {
  169. return NULL;
  170. }
  171. entry->key.id = ast_strdup(id);
  172. if (!entry->key.id) {
  173. ao2_cleanup(entry);
  174. return NULL;
  175. }
  176. entry->key.type = ao2_bump(type);
  177. cache_entry_compute_hash(&entry->key);
  178. is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
  179. if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
  180. ao2_cleanup(entry);
  181. return NULL;
  182. }
  183. if (is_remote) {
  184. if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
  185. ao2_cleanup(entry);
  186. return NULL;
  187. }
  188. } else {
  189. entry->local = snapshot;
  190. }
  191. ao2_bump(snapshot);
  192. return entry;
  193. }
  194. static int cache_entry_hash(const void *obj, int flags)
  195. {
  196. const struct stasis_cache_entry *object;
  197. const struct cache_entry_key *key;
  198. switch (flags & OBJ_SEARCH_MASK) {
  199. case OBJ_SEARCH_KEY:
  200. key = obj;
  201. break;
  202. case OBJ_SEARCH_OBJECT:
  203. object = obj;
  204. key = &object->key;
  205. break;
  206. default:
  207. /* Hash can only work on something with a full key. */
  208. ast_assert(0);
  209. return 0;
  210. }
  211. return (int)key->hash;
  212. }
  213. static int cache_entry_cmp(void *obj, void *arg, int flags)
  214. {
  215. const struct stasis_cache_entry *object_left = obj;
  216. const struct stasis_cache_entry *object_right = arg;
  217. const struct cache_entry_key *right_key = arg;
  218. int cmp;
  219. switch (flags & OBJ_SEARCH_MASK) {
  220. case OBJ_SEARCH_OBJECT:
  221. right_key = &object_right->key;
  222. /* Fall through */
  223. case OBJ_SEARCH_KEY:
  224. cmp = object_left->key.type != right_key->type
  225. || strcmp(object_left->key.id, right_key->id);
  226. break;
  227. case OBJ_SEARCH_PARTIAL_KEY:
  228. /* Not supported by container */
  229. ast_assert(0);
  230. cmp = -1;
  231. break;
  232. default:
  233. /*
  234. * What arg points to is specific to this traversal callback
  235. * and has no special meaning to astobj2.
  236. */
  237. cmp = 0;
  238. break;
  239. }
  240. if (cmp) {
  241. return 0;
  242. }
  243. /*
  244. * At this point the traversal callback is identical to a sorted
  245. * container.
  246. */
  247. return CMP_MATCH;
  248. }
  249. static void cache_dtor(void *obj)
  250. {
  251. struct stasis_cache *cache = obj;
  252. ao2_cleanup(cache->entries);
  253. cache->entries = NULL;
  254. }
  255. struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn,
  256. cache_aggregate_calc_fn aggregate_calc_fn,
  257. cache_aggregate_publish_fn aggregate_publish_fn)
  258. {
  259. struct stasis_cache *cache;
  260. cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
  261. AO2_ALLOC_OPT_LOCK_NOLOCK);
  262. if (!cache) {
  263. return NULL;
  264. }
  265. cache->entries = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
  266. NUM_CACHE_BUCKETS, cache_entry_hash, NULL, cache_entry_cmp);
  267. if (!cache->entries) {
  268. ao2_cleanup(cache);
  269. return NULL;
  270. }
  271. cache->id_fn = id_fn;
  272. cache->aggregate_calc_fn = aggregate_calc_fn;
  273. cache->aggregate_publish_fn = aggregate_publish_fn;
  274. return cache;
  275. }
  276. struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
  277. {
  278. return stasis_cache_create_full(id_fn, NULL, NULL);
  279. }
  280. struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
  281. {
  282. return entry->aggregate;
  283. }
  284. struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
  285. {
  286. return entry->local;
  287. }
  288. struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
  289. {
  290. if (idx < AST_VECTOR_SIZE(&entry->remote)) {
  291. return AST_VECTOR_GET(&entry->remote, idx);
  292. }
  293. return NULL;
  294. }
  295. /*!
  296. * \internal
  297. * \brief Find the cache entry in the cache entries container.
  298. *
  299. * \param entries Container of cached entries.
  300. * \param type Type of message to retrieve the cache entry.
  301. * \param id Identity of the snapshot to retrieve the cache entry.
  302. *
  303. * \note The entries container is already locked.
  304. *
  305. * \retval Cache-entry on success.
  306. * \retval NULL Not in cache.
  307. */
  308. static struct stasis_cache_entry *cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
  309. {
  310. struct cache_entry_key search_key;
  311. struct stasis_cache_entry *entry;
  312. search_key.type = type;
  313. search_key.id = id;
  314. cache_entry_compute_hash(&search_key);
  315. entry = ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
  316. /* Ensure that what we looked for is what we found. */
  317. ast_assert(!entry
  318. || (!strcmp(stasis_message_type_name(entry->key.type),
  319. stasis_message_type_name(type)) && !strcmp(entry->key.id, id)));
  320. return entry;
  321. }
  322. /*!
  323. * \internal
  324. * \brief Remove the stasis snapshot in the cache entry determined by eid.
  325. *
  326. * \param entries Container of cached entries.
  327. * \param cached_entry The entry to remove the snapshot from.
  328. * \param eid Which snapshot in the cached entry.
  329. *
  330. * \note The entries container is already locked.
  331. *
  332. * \return Previous stasis entry snapshot.
  333. */
  334. static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
  335. {
  336. struct stasis_message *old_snapshot;
  337. int is_remote;
  338. is_remote = ast_eid_cmp(eid, &ast_eid_default);
  339. if (!is_remote) {
  340. old_snapshot = cached_entry->local;
  341. cached_entry->local = NULL;
  342. } else {
  343. int idx;
  344. old_snapshot = NULL;
  345. for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
  346. struct stasis_message *cur;
  347. cur = AST_VECTOR_GET(&cached_entry->remote, idx);
  348. if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
  349. old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
  350. break;
  351. }
  352. }
  353. }
  354. if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
  355. ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
  356. }
  357. return old_snapshot;
  358. }
  359. /*!
  360. * \internal
  361. * \brief Update the stasis snapshot in the cache entry determined by eid.
  362. *
  363. * \param cached_entry The entry to remove the snapshot from.
  364. * \param eid Which snapshot in the cached entry.
  365. * \param new_snapshot Snapshot to replace the old snapshot.
  366. *
  367. * \return Previous stasis entry snapshot.
  368. */
  369. static struct stasis_message *cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
  370. {
  371. struct stasis_message *old_snapshot;
  372. int is_remote;
  373. int idx;
  374. is_remote = ast_eid_cmp(eid, &ast_eid_default);
  375. if (!is_remote) {
  376. old_snapshot = cached_entry->local;
  377. cached_entry->local = ao2_bump(new_snapshot);
  378. return old_snapshot;
  379. }
  380. old_snapshot = NULL;
  381. for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
  382. struct stasis_message *cur;
  383. cur = AST_VECTOR_GET(&cached_entry->remote, idx);
  384. if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
  385. old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
  386. break;
  387. }
  388. }
  389. if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
  390. ao2_bump(new_snapshot);
  391. }
  392. return old_snapshot;
  393. }
  394. struct cache_put_snapshots {
  395. /*! Old cache eid snapshot. */
  396. struct stasis_message *old;
  397. /*! Old cache aggregate snapshot. */
  398. struct stasis_message *aggregate_old;
  399. /*! New cache aggregate snapshot. */
  400. struct stasis_message *aggregate_new;
  401. };
  402. static struct cache_put_snapshots cache_put(struct stasis_cache *cache,
  403. struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
  404. struct stasis_message *new_snapshot)
  405. {
  406. struct stasis_cache_entry *cached_entry;
  407. struct cache_put_snapshots snapshots;
  408. ast_assert(cache->entries != NULL);
  409. ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
  410. ast_assert(new_snapshot == NULL ||
  411. type == stasis_message_type(new_snapshot));
  412. memset(&snapshots, 0, sizeof(snapshots));
  413. ao2_wrlock(cache->entries);
  414. cached_entry = cache_find(cache->entries, type, id);
  415. /* Update the eid snapshot. */
  416. if (!new_snapshot) {
  417. /* Remove snapshot from cache */
  418. if (cached_entry) {
  419. snapshots.old = cache_remove(cache->entries, cached_entry, eid);
  420. }
  421. } else if (cached_entry) {
  422. /* Update snapshot in cache */
  423. snapshots.old = cache_udpate(cached_entry, eid, new_snapshot);
  424. } else {
  425. /* Insert into the cache */
  426. cached_entry = cache_entry_create(type, id, new_snapshot);
  427. if (cached_entry) {
  428. ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
  429. }
  430. }
  431. /* Update the aggregate snapshot. */
  432. if (cache->aggregate_calc_fn && cached_entry) {
  433. snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
  434. snapshots.aggregate_old = cached_entry->aggregate;
  435. cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
  436. }
  437. ao2_unlock(cache->entries);
  438. ao2_cleanup(cached_entry);
  439. return snapshots;
  440. }
  441. /*!
  442. * \internal
  443. * \brief Dump all entity snapshots in the cache entry into the given container.
  444. *
  445. * \param snapshots Container to put all snapshots in the cache entry.
  446. * \param entry Cache entry to use.
  447. *
  448. * \retval 0 on success.
  449. * \retval non-zero on error.
  450. */
  451. static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
  452. {
  453. int idx;
  454. int err = 0;
  455. ast_assert(snapshots != NULL);
  456. ast_assert(entry != NULL);
  457. /* The aggregate snapshot is not a snapshot from an entity. */
  458. if (entry->local) {
  459. err |= !ao2_link(snapshots, entry->local);
  460. }
  461. for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
  462. struct stasis_message *snapshot;
  463. snapshot = AST_VECTOR_GET(&entry->remote, idx);
  464. err |= !ao2_link(snapshots, snapshot);
  465. }
  466. return err;
  467. }
  468. struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
  469. {
  470. struct stasis_cache_entry *cached_entry;
  471. struct ao2_container *found;
  472. ast_assert(cache != NULL);
  473. ast_assert(cache->entries != NULL);
  474. ast_assert(id != NULL);
  475. if (!type) {
  476. return NULL;
  477. }
  478. found = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
  479. if (!found) {
  480. return NULL;
  481. }
  482. ao2_rdlock(cache->entries);
  483. cached_entry = cache_find(cache->entries, type, id);
  484. if (cached_entry && cache_entry_dump(found, cached_entry)) {
  485. ao2_cleanup(found);
  486. found = NULL;
  487. }
  488. ao2_unlock(cache->entries);
  489. ao2_cleanup(cached_entry);
  490. return found;
  491. }
  492. /*!
  493. * \internal
  494. * \brief Retrieve an item from the cache entry for a specific eid.
  495. *
  496. * \param entry Cache entry to use.
  497. * \param eid Specific entity id to retrieve. NULL for aggregate.
  498. *
  499. * \note The returned snapshot has not had its reference bumped.
  500. *
  501. * \retval Snapshot from the cache.
  502. * \retval \c NULL if snapshot is not found.
  503. */
  504. static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
  505. {
  506. int is_remote;
  507. int idx;
  508. if (!eid) {
  509. /* Get aggregate. */
  510. return entry->aggregate;
  511. }
  512. /* Get snapshot with specific eid. */
  513. is_remote = ast_eid_cmp(eid, &ast_eid_default);
  514. if (!is_remote) {
  515. return entry->local;
  516. }
  517. for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
  518. struct stasis_message *cur;
  519. cur = AST_VECTOR_GET(&entry->remote, idx);
  520. if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
  521. return cur;
  522. }
  523. }
  524. return NULL;
  525. }
  526. struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
  527. {
  528. struct stasis_cache_entry *cached_entry;
  529. struct stasis_message *snapshot = NULL;
  530. ast_assert(cache != NULL);
  531. ast_assert(cache->entries != NULL);
  532. ast_assert(id != NULL);
  533. if (!type) {
  534. return NULL;
  535. }
  536. ao2_rdlock(cache->entries);
  537. cached_entry = cache_find(cache->entries, type, id);
  538. if (cached_entry) {
  539. snapshot = cache_entry_by_eid(cached_entry, eid);
  540. ao2_bump(snapshot);
  541. }
  542. ao2_unlock(cache->entries);
  543. ao2_cleanup(cached_entry);
  544. return snapshot;
  545. }
  546. struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
  547. {
  548. return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
  549. }
  550. struct cache_dump_data {
  551. struct ao2_container *container;
  552. struct stasis_message_type *type;
  553. const struct ast_eid *eid;
  554. };
  555. static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
  556. {
  557. struct cache_dump_data *cache_dump = arg;
  558. struct stasis_cache_entry *entry = obj;
  559. if (!cache_dump->type || entry->key.type == cache_dump->type) {
  560. struct stasis_message *snapshot;
  561. snapshot = cache_entry_by_eid(entry, cache_dump->eid);
  562. if (snapshot) {
  563. if (!ao2_link(cache_dump->container, snapshot)) {
  564. ao2_cleanup(cache_dump->container);
  565. cache_dump->container = NULL;
  566. return CMP_STOP;
  567. }
  568. }
  569. }
  570. return 0;
  571. }
  572. struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
  573. {
  574. struct cache_dump_data cache_dump;
  575. ast_assert(cache != NULL);
  576. ast_assert(cache->entries != NULL);
  577. cache_dump.eid = eid;
  578. cache_dump.type = type;
  579. cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
  580. if (!cache_dump.container) {
  581. return NULL;
  582. }
  583. ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_by_eid_cb, &cache_dump);
  584. return cache_dump.container;
  585. }
  586. struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
  587. {
  588. return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
  589. }
  590. static int cache_dump_all_cb(void *obj, void *arg, int flags)
  591. {
  592. struct cache_dump_data *cache_dump = arg;
  593. struct stasis_cache_entry *entry = obj;
  594. if (!cache_dump->type || entry->key.type == cache_dump->type) {
  595. if (cache_entry_dump(cache_dump->container, entry)) {
  596. ao2_cleanup(cache_dump->container);
  597. cache_dump->container = NULL;
  598. return CMP_STOP;
  599. }
  600. }
  601. return 0;
  602. }
  603. struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
  604. {
  605. struct cache_dump_data cache_dump;
  606. ast_assert(cache != NULL);
  607. ast_assert(cache->entries != NULL);
  608. cache_dump.eid = NULL;
  609. cache_dump.type = type;
  610. cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
  611. if (!cache_dump.container) {
  612. return NULL;
  613. }
  614. ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_all_cb, &cache_dump);
  615. return cache_dump.container;
  616. }
  617. STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
  618. STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
  619. struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
  620. {
  621. return stasis_message_create(stasis_cache_clear_type(), id_message);
  622. }
  623. static void stasis_cache_update_dtor(void *obj)
  624. {
  625. struct stasis_cache_update *update = obj;
  626. ao2_cleanup(update->old_snapshot);
  627. update->old_snapshot = NULL;
  628. ao2_cleanup(update->new_snapshot);
  629. update->new_snapshot = NULL;
  630. ao2_cleanup(update->type);
  631. update->type = NULL;
  632. }
  633. static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
  634. {
  635. struct stasis_cache_update *update;
  636. struct stasis_message *msg;
  637. ast_assert(old_snapshot != NULL || new_snapshot != NULL);
  638. if (!stasis_cache_update_type()) {
  639. return NULL;
  640. }
  641. update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
  642. AO2_ALLOC_OPT_LOCK_NOLOCK);
  643. if (!update) {
  644. return NULL;
  645. }
  646. if (old_snapshot) {
  647. ao2_ref(old_snapshot, +1);
  648. update->old_snapshot = old_snapshot;
  649. if (!new_snapshot) {
  650. ao2_ref(stasis_message_type(old_snapshot), +1);
  651. update->type = stasis_message_type(old_snapshot);
  652. }
  653. }
  654. if (new_snapshot) {
  655. ao2_ref(new_snapshot, +1);
  656. update->new_snapshot = new_snapshot;
  657. ao2_ref(stasis_message_type(new_snapshot), +1);
  658. update->type = stasis_message_type(new_snapshot);
  659. }
  660. msg = stasis_message_create(stasis_cache_update_type(), update);
  661. ao2_cleanup(update);
  662. return msg;
  663. }
  664. static void caching_topic_exec(void *data, struct stasis_subscription *sub,
  665. struct stasis_message *message)
  666. {
  667. struct stasis_caching_topic *caching_topic_needs_unref;
  668. struct stasis_caching_topic *caching_topic = data;
  669. struct stasis_message *msg;
  670. struct stasis_message *msg_put;
  671. struct stasis_message_type *msg_type;
  672. const struct ast_eid *msg_eid;
  673. const char *msg_id;
  674. ast_assert(caching_topic != NULL);
  675. ast_assert(caching_topic->topic != NULL);
  676. ast_assert(caching_topic->cache != NULL);
  677. ast_assert(caching_topic->cache->id_fn != NULL);
  678. if (stasis_subscription_final_message(sub, message)) {
  679. caching_topic_needs_unref = caching_topic;
  680. } else {
  681. caching_topic_needs_unref = NULL;
  682. }
  683. msg_type = stasis_message_type(message);
  684. if (stasis_cache_clear_type() == msg_type) {
  685. /* Cache clear event. */
  686. msg_put = NULL;
  687. msg = stasis_message_data(message);
  688. msg_type = stasis_message_type(msg);
  689. } else {
  690. /* Normal cache update event. */
  691. msg_put = message;
  692. msg = message;
  693. }
  694. ast_assert(msg_type != NULL);
  695. msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
  696. msg_id = caching_topic->cache->id_fn(msg);
  697. if (msg_id && msg_eid) {
  698. struct stasis_message *update;
  699. struct cache_put_snapshots snapshots;
  700. /* Update the cache */
  701. snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
  702. if (snapshots.old || msg_put) {
  703. update = update_create(snapshots.old, msg_put);
  704. if (update) {
  705. stasis_publish(caching_topic->topic, update);
  706. }
  707. ao2_cleanup(update);
  708. } else {
  709. ast_log(LOG_ERROR,
  710. "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
  711. stasis_topic_name(caching_topic->topic),
  712. stasis_message_type_name(msg_type), msg_id);
  713. }
  714. if (snapshots.aggregate_old != snapshots.aggregate_new) {
  715. if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
  716. caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
  717. snapshots.aggregate_new);
  718. }
  719. update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
  720. if (update) {
  721. stasis_publish(caching_topic->topic, update);
  722. }
  723. ao2_cleanup(update);
  724. }
  725. ao2_cleanup(snapshots.old);
  726. ao2_cleanup(snapshots.aggregate_old);
  727. ao2_cleanup(snapshots.aggregate_new);
  728. }
  729. ao2_cleanup(caching_topic_needs_unref);
  730. }
  731. struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
  732. {
  733. RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
  734. struct stasis_subscription *sub;
  735. RAII_VAR(char *, new_name, NULL, ast_free);
  736. int ret;
  737. ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
  738. if (ret < 0) {
  739. return NULL;
  740. }
  741. caching_topic = ao2_alloc_options(sizeof(*caching_topic),
  742. stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
  743. if (caching_topic == NULL) {
  744. return NULL;
  745. }
  746. caching_topic->topic = stasis_topic_create(new_name);
  747. if (caching_topic->topic == NULL) {
  748. return NULL;
  749. }
  750. ao2_ref(cache, +1);
  751. caching_topic->cache = cache;
  752. sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
  753. if (sub == NULL) {
  754. return NULL;
  755. }
  756. ao2_ref(original_topic, +1);
  757. caching_topic->original_topic = original_topic;
  758. /* This is for the reference contained in the subscription above */
  759. ao2_ref(caching_topic, +1);
  760. caching_topic->sub = sub;
  761. /* The subscription holds the reference, so no additional ref bump. */
  762. return caching_topic;
  763. }
  764. static void stasis_cache_cleanup(void)
  765. {
  766. STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
  767. STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
  768. }
  769. int stasis_cache_init(void)
  770. {
  771. ast_register_cleanup(stasis_cache_cleanup);
  772. if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_clear_type) != 0) {
  773. return -1;
  774. }
  775. if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_update_type) != 0) {
  776. return -1;
  777. }
  778. return 0;
  779. }