endpoints.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Asterisk endpoint API.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/endpoints.h"
  31. #include "asterisk/stasis.h"
  32. #include "asterisk/stasis_channels.h"
  33. #include "asterisk/stasis_endpoints.h"
  34. #include "asterisk/stasis_message_router.h"
  35. #include "asterisk/stringfields.h"
  36. #include "asterisk/_private.h"
  37. /*! Buckets for endpoint->channel mappings. Keep it prime! */
  38. #define ENDPOINT_CHANNEL_BUCKETS 127
  39. /*! Buckets for endpoint hash. Keep it prime! */
  40. #define ENDPOINT_BUCKETS 127
  41. /*! Buckets for technology endpoints. */
  42. #define TECH_ENDPOINT_BUCKETS 11
  43. static struct ao2_container *endpoints;
  44. static struct ao2_container *tech_endpoints;
  45. struct ast_endpoint {
  46. AST_DECLARE_STRING_FIELDS(
  47. AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
  48. AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */
  49. AST_STRING_FIELD(id); /*!< tech/resource id */
  50. );
  51. /*! Endpoint's current state */
  52. enum ast_endpoint_state state;
  53. /*!
  54. * \brief Max channels for this endpoint. -1 means unlimited or unknown.
  55. *
  56. * Note that this simply documents the limits of an endpoint, and does
  57. * nothing to try to enforce the limit.
  58. */
  59. int max_channels;
  60. /*! Topic for this endpoint's messages */
  61. struct stasis_cp_single *topics;
  62. /*! Router for handling this endpoint's messages */
  63. struct stasis_message_router *router;
  64. /*! ast_str_container of channels associated with this endpoint */
  65. struct ao2_container *channel_ids;
  66. /*! Forwarding subscription from an endpoint to its tech endpoint */
  67. struct stasis_forward *tech_forward;
  68. };
  69. static int endpoint_hash(const void *obj, int flags)
  70. {
  71. const struct ast_endpoint *endpoint;
  72. const char *key;
  73. switch (flags & OBJ_SEARCH_MASK) {
  74. case OBJ_SEARCH_KEY:
  75. key = obj;
  76. return ast_str_hash(key);
  77. case OBJ_SEARCH_OBJECT:
  78. endpoint = obj;
  79. return ast_str_hash(endpoint->id);
  80. default:
  81. /* Hash can only work on something with a full key. */
  82. ast_assert(0);
  83. return 0;
  84. }
  85. }
  86. static int endpoint_cmp(void *obj, void *arg, int flags)
  87. {
  88. const struct ast_endpoint *left = obj;
  89. const struct ast_endpoint *right = arg;
  90. const char *right_key = arg;
  91. int cmp;
  92. switch (flags & OBJ_SEARCH_MASK) {
  93. case OBJ_SEARCH_OBJECT:
  94. right_key = right->id;
  95. /* Fall through */
  96. case OBJ_SEARCH_KEY:
  97. cmp = strcmp(left->id, right_key);
  98. break;
  99. case OBJ_SEARCH_PARTIAL_KEY:
  100. cmp = strncmp(left->id, right_key, strlen(right_key));
  101. break;
  102. default:
  103. ast_assert(0);
  104. cmp = 0;
  105. break;
  106. }
  107. if (cmp) {
  108. return 0;
  109. }
  110. return CMP_MATCH;
  111. }
  112. struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
  113. {
  114. struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
  115. if (!endpoint) {
  116. endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
  117. }
  118. return endpoint;
  119. }
  120. struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
  121. {
  122. if (!endpoint) {
  123. return ast_endpoint_topic_all();
  124. }
  125. return stasis_cp_single_topic(endpoint->topics);
  126. }
  127. struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
  128. {
  129. if (!endpoint) {
  130. return ast_endpoint_topic_all_cached();
  131. }
  132. return stasis_cp_single_topic_cached(endpoint->topics);
  133. }
  134. const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
  135. {
  136. switch (state) {
  137. case AST_ENDPOINT_UNKNOWN:
  138. return "unknown";
  139. case AST_ENDPOINT_OFFLINE:
  140. return "offline";
  141. case AST_ENDPOINT_ONLINE:
  142. return "online";
  143. }
  144. return "?";
  145. }
  146. static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
  147. {
  148. RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
  149. RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
  150. ast_assert(endpoint != NULL);
  151. ast_assert(endpoint->topics != NULL);
  152. if (!ast_endpoint_snapshot_type()) {
  153. return;
  154. }
  155. snapshot = ast_endpoint_snapshot_create(endpoint);
  156. if (!snapshot) {
  157. return;
  158. }
  159. message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
  160. if (!message) {
  161. return;
  162. }
  163. stasis_publish(ast_endpoint_topic(endpoint), message);
  164. }
  165. static void endpoint_dtor(void *obj)
  166. {
  167. struct ast_endpoint *endpoint = obj;
  168. /* The router should be shut down already */
  169. ast_assert(stasis_message_router_is_done(endpoint->router));
  170. ao2_cleanup(endpoint->router);
  171. endpoint->router = NULL;
  172. stasis_cp_single_unsubscribe(endpoint->topics);
  173. endpoint->topics = NULL;
  174. ao2_cleanup(endpoint->channel_ids);
  175. endpoint->channel_ids = NULL;
  176. ast_string_field_free_memory(endpoint);
  177. }
  178. int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
  179. struct ast_channel *chan)
  180. {
  181. ast_assert(chan != NULL);
  182. ast_assert(endpoint != NULL);
  183. ast_assert(!ast_strlen_zero(endpoint->resource));
  184. ast_channel_forward_endpoint(chan, endpoint);
  185. ao2_lock(endpoint);
  186. ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
  187. ao2_unlock(endpoint);
  188. endpoint_publish_snapshot(endpoint);
  189. return 0;
  190. }
  191. /*! \brief Handler for channel snapshot cache clears */
  192. static void endpoint_cache_clear(void *data,
  193. struct stasis_subscription *sub,
  194. struct stasis_message *message)
  195. {
  196. struct ast_endpoint *endpoint = data;
  197. struct stasis_message *clear_msg = stasis_message_data(message);
  198. struct ast_channel_snapshot *clear_snapshot;
  199. if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
  200. return;
  201. }
  202. clear_snapshot = stasis_message_data(clear_msg);
  203. ast_assert(endpoint != NULL);
  204. ao2_lock(endpoint);
  205. ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
  206. ao2_unlock(endpoint);
  207. endpoint_publish_snapshot(endpoint);
  208. }
  209. static void endpoint_default(void *data,
  210. struct stasis_subscription *sub,
  211. struct stasis_message *message)
  212. {
  213. struct stasis_endpoint *endpoint = data;
  214. if (stasis_subscription_final_message(sub, message)) {
  215. ao2_cleanup(endpoint);
  216. }
  217. }
  218. static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
  219. {
  220. RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
  221. RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
  222. int r = 0;
  223. /* Get/create the technology endpoint */
  224. if (!ast_strlen_zero(resource)) {
  225. tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
  226. if (!tech_endpoint) {
  227. tech_endpoint = endpoint_internal_create(tech, NULL);
  228. if (!tech_endpoint) {
  229. return NULL;
  230. }
  231. }
  232. }
  233. endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
  234. if (!endpoint) {
  235. return NULL;
  236. }
  237. endpoint->max_channels = -1;
  238. endpoint->state = AST_ENDPOINT_UNKNOWN;
  239. if (ast_string_field_init(endpoint, 80) != 0) {
  240. return NULL;
  241. }
  242. ast_string_field_set(endpoint, tech, tech);
  243. ast_string_field_set(endpoint, resource, S_OR(resource, ""));
  244. ast_string_field_build(endpoint, id, "%s%s%s",
  245. tech,
  246. !ast_strlen_zero(resource) ? "/" : "",
  247. S_OR(resource, ""));
  248. /* All access to channel_ids should be covered by the endpoint's
  249. * lock; no extra lock needed. */
  250. endpoint->channel_ids = ast_str_container_alloc_options(
  251. AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
  252. if (!endpoint->channel_ids) {
  253. return NULL;
  254. }
  255. endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
  256. endpoint->id);
  257. if (!endpoint->topics) {
  258. return NULL;
  259. }
  260. if (!ast_strlen_zero(resource)) {
  261. endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
  262. if (!endpoint->router) {
  263. return NULL;
  264. }
  265. r |= stasis_message_router_add(endpoint->router,
  266. stasis_cache_clear_type(), endpoint_cache_clear,
  267. endpoint);
  268. r |= stasis_message_router_set_default(endpoint->router,
  269. endpoint_default, endpoint);
  270. if (r) {
  271. return NULL;
  272. }
  273. endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
  274. stasis_cp_single_topic(tech_endpoint->topics));
  275. endpoint_publish_snapshot(endpoint);
  276. ao2_link(endpoints, endpoint);
  277. } else {
  278. ao2_link(tech_endpoints, endpoint);
  279. }
  280. ao2_ref(endpoint, +1);
  281. return endpoint;
  282. }
  283. struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
  284. {
  285. if (ast_strlen_zero(tech)) {
  286. ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
  287. return NULL;
  288. }
  289. if (ast_strlen_zero(resource)) {
  290. ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
  291. return NULL;
  292. }
  293. return endpoint_internal_create(tech, resource);
  294. }
  295. static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
  296. {
  297. RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
  298. if (!ast_endpoint_snapshot_type()) {
  299. return NULL;
  300. }
  301. snapshot = ast_endpoint_snapshot_create(endpoint);
  302. if (!snapshot) {
  303. return NULL;
  304. }
  305. return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
  306. }
  307. void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
  308. {
  309. RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
  310. if (endpoint == NULL) {
  311. return;
  312. }
  313. ao2_unlink(endpoints, endpoint);
  314. endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
  315. clear_msg = create_endpoint_snapshot_message(endpoint);
  316. if (clear_msg) {
  317. RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
  318. message = stasis_cache_clear_create(clear_msg);
  319. if (message) {
  320. stasis_publish(ast_endpoint_topic(endpoint), message);
  321. }
  322. }
  323. /* Bump refcount to hold on to the router */
  324. ao2_ref(endpoint->router, +1);
  325. stasis_message_router_unsubscribe(endpoint->router);
  326. }
  327. const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
  328. {
  329. if (!endpoint) {
  330. return NULL;
  331. }
  332. return endpoint->tech;
  333. }
  334. const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
  335. {
  336. if (!endpoint) {
  337. return NULL;
  338. }
  339. return endpoint->resource;
  340. }
  341. const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
  342. {
  343. if (!endpoint) {
  344. return NULL;
  345. }
  346. return endpoint->id;
  347. }
  348. void ast_endpoint_set_state(struct ast_endpoint *endpoint,
  349. enum ast_endpoint_state state)
  350. {
  351. ast_assert(endpoint != NULL);
  352. ast_assert(!ast_strlen_zero(endpoint->resource));
  353. ao2_lock(endpoint);
  354. endpoint->state = state;
  355. ao2_unlock(endpoint);
  356. endpoint_publish_snapshot(endpoint);
  357. }
  358. void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
  359. int max_channels)
  360. {
  361. ast_assert(endpoint != NULL);
  362. ast_assert(!ast_strlen_zero(endpoint->resource));
  363. ao2_lock(endpoint);
  364. endpoint->max_channels = max_channels;
  365. ao2_unlock(endpoint);
  366. endpoint_publish_snapshot(endpoint);
  367. }
  368. static void endpoint_snapshot_dtor(void *obj)
  369. {
  370. struct ast_endpoint_snapshot *snapshot = obj;
  371. int channel;
  372. ast_assert(snapshot != NULL);
  373. for (channel = 0; channel < snapshot->num_channels; channel++) {
  374. ao2_ref(snapshot->channel_ids[channel], -1);
  375. }
  376. ast_string_field_free_memory(snapshot);
  377. }
  378. struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
  379. struct ast_endpoint *endpoint)
  380. {
  381. RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
  382. int channel_count;
  383. struct ao2_iterator i;
  384. void *obj;
  385. SCOPED_AO2LOCK(lock, endpoint);
  386. ast_assert(endpoint != NULL);
  387. ast_assert(!ast_strlen_zero(endpoint->resource));
  388. channel_count = ao2_container_count(endpoint->channel_ids);
  389. snapshot = ao2_alloc_options(
  390. sizeof(*snapshot) + channel_count * sizeof(char *),
  391. endpoint_snapshot_dtor,
  392. AO2_ALLOC_OPT_LOCK_NOLOCK);
  393. if (!snapshot || ast_string_field_init(snapshot, 80) != 0) {
  394. ao2_cleanup(snapshot);
  395. return NULL;
  396. }
  397. ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
  398. endpoint->resource);
  399. ast_string_field_set(snapshot, tech, endpoint->tech);
  400. ast_string_field_set(snapshot, resource, endpoint->resource);
  401. snapshot->state = endpoint->state;
  402. snapshot->max_channels = endpoint->max_channels;
  403. i = ao2_iterator_init(endpoint->channel_ids, 0);
  404. while ((obj = ao2_iterator_next(&i))) {
  405. /* The reference is kept so the channel id does not go away until the snapshot is gone */
  406. snapshot->channel_ids[snapshot->num_channels++] = obj;
  407. }
  408. ao2_iterator_destroy(&i);
  409. ao2_ref(snapshot, +1);
  410. return snapshot;
  411. }
  412. static void endpoint_cleanup(void)
  413. {
  414. ao2_cleanup(endpoints);
  415. endpoints = NULL;
  416. ao2_cleanup(tech_endpoints);
  417. tech_endpoints = NULL;
  418. }
  419. int ast_endpoint_init(void)
  420. {
  421. ast_register_cleanup(endpoint_cleanup);
  422. endpoints = ao2_container_alloc(ENDPOINT_BUCKETS, endpoint_hash,
  423. endpoint_cmp);
  424. if (!endpoints) {
  425. return -1;
  426. }
  427. tech_endpoints = ao2_container_alloc(TECH_ENDPOINT_BUCKETS, endpoint_hash,
  428. endpoint_cmp);
  429. if (!tech_endpoints) {
  430. return -1;
  431. }
  432. return 0;
  433. }