endpoints.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Asterisk endpoint API.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/endpoints.h"
  31. #include "asterisk/stasis.h"
  32. #include "asterisk/stasis_channels.h"
  33. #include "asterisk/stasis_endpoints.h"
  34. #include "asterisk/stasis_message_router.h"
  35. #include "asterisk/stringfields.h"
  36. #include "asterisk/_private.h"
  37. /*! Buckets for endpoint->channel mappings. Keep it prime! */
  38. #define ENDPOINT_CHANNEL_BUCKETS 127
  39. /*! Buckets for endpoint hash. Keep it prime! */
  40. #define ENDPOINT_BUCKETS 127
  41. static struct ao2_container *endpoints;
  42. struct ast_endpoint {
  43. AST_DECLARE_STRING_FIELDS(
  44. AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
  45. AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */
  46. AST_STRING_FIELD(id); /*!< tech/resource id */
  47. );
  48. /*! Endpoint's current state */
  49. enum ast_endpoint_state state;
  50. /*!
  51. * \brief Max channels for this endpoint. -1 means unlimited or unknown.
  52. *
  53. * Note that this simply documents the limits of an endpoint, and does
  54. * nothing to try to enforce the limit.
  55. */
  56. int max_channels;
  57. /*! Topic for this endpoint's messages */
  58. struct stasis_cp_single *topics;
  59. /*! Router for handling this endpoint's messages */
  60. struct stasis_message_router *router;
  61. /*! ast_str_container of channels associated with this endpoint */
  62. struct ao2_container *channel_ids;
  63. };
  64. static int endpoint_hash(const void *obj, int flags)
  65. {
  66. const struct ast_endpoint *endpoint;
  67. const char *key;
  68. switch (flags & OBJ_SEARCH_MASK) {
  69. case OBJ_SEARCH_KEY:
  70. key = obj;
  71. return ast_str_hash(key);
  72. case OBJ_SEARCH_OBJECT:
  73. endpoint = obj;
  74. return ast_str_hash(endpoint->id);
  75. default:
  76. /* Hash can only work on something with a full key. */
  77. ast_assert(0);
  78. return 0;
  79. }
  80. }
  81. static int endpoint_cmp(void *obj, void *arg, int flags)
  82. {
  83. const struct ast_endpoint *left = obj;
  84. const struct ast_endpoint *right = arg;
  85. const char *right_key = arg;
  86. int cmp;
  87. switch (flags & OBJ_SEARCH_MASK) {
  88. case OBJ_SEARCH_OBJECT:
  89. right_key = right->id;
  90. /* Fall through */
  91. case OBJ_SEARCH_KEY:
  92. cmp = strcmp(left->id, right_key);
  93. break;
  94. case OBJ_SEARCH_PARTIAL_KEY:
  95. cmp = strncmp(left->id, right_key, strlen(right_key));
  96. break;
  97. default:
  98. ast_assert(0);
  99. cmp = 0;
  100. break;
  101. }
  102. if (cmp) {
  103. return 0;
  104. }
  105. return CMP_MATCH;
  106. }
  107. struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
  108. {
  109. return ao2_find(endpoints, id, OBJ_KEY);
  110. }
  111. struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
  112. {
  113. if (!endpoint) {
  114. return ast_endpoint_topic_all();
  115. }
  116. return stasis_cp_single_topic(endpoint->topics);
  117. }
  118. struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
  119. {
  120. if (!endpoint) {
  121. return ast_endpoint_topic_all_cached();
  122. }
  123. return stasis_cp_single_topic_cached(endpoint->topics);
  124. }
  125. const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
  126. {
  127. switch (state) {
  128. case AST_ENDPOINT_UNKNOWN:
  129. return "unknown";
  130. case AST_ENDPOINT_OFFLINE:
  131. return "offline";
  132. case AST_ENDPOINT_ONLINE:
  133. return "online";
  134. }
  135. return "?";
  136. }
  137. static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
  138. {
  139. RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
  140. RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
  141. ast_assert(endpoint != NULL);
  142. ast_assert(endpoint->topics != NULL);
  143. snapshot = ast_endpoint_snapshot_create(endpoint);
  144. if (!snapshot) {
  145. return;
  146. }
  147. message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
  148. if (!message) {
  149. return;
  150. }
  151. stasis_publish(ast_endpoint_topic(endpoint), message);
  152. }
  153. static void endpoint_dtor(void *obj)
  154. {
  155. struct ast_endpoint *endpoint = obj;
  156. /* The router should be shut down already */
  157. ast_assert(stasis_message_router_is_done(endpoint->router));
  158. ao2_cleanup(endpoint->router);
  159. endpoint->router = NULL;
  160. stasis_cp_single_unsubscribe(endpoint->topics);
  161. endpoint->topics = NULL;
  162. ao2_cleanup(endpoint->channel_ids);
  163. endpoint->channel_ids = NULL;
  164. ast_string_field_free_memory(endpoint);
  165. }
  166. int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
  167. struct ast_channel *chan)
  168. {
  169. ast_assert(chan != NULL);
  170. ast_assert(endpoint != NULL);
  171. ast_channel_forward_endpoint(chan, endpoint);
  172. ao2_lock(endpoint);
  173. ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
  174. ao2_unlock(endpoint);
  175. ast_channel_lock(chan);
  176. ast_publish_channel_state(chan);
  177. ast_channel_unlock(chan);
  178. endpoint_publish_snapshot(endpoint);
  179. return 0;
  180. }
  181. /*! \brief Handler for channel snapshot cache clears */
  182. static void endpoint_cache_clear(void *data,
  183. struct stasis_subscription *sub,
  184. struct stasis_message *message)
  185. {
  186. struct ast_endpoint *endpoint = data;
  187. struct stasis_message *clear_msg = stasis_message_data(message);
  188. struct ast_channel_snapshot *clear_snapshot;
  189. if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
  190. return;
  191. }
  192. clear_snapshot = stasis_message_data(clear_msg);
  193. ast_assert(endpoint != NULL);
  194. ao2_lock(endpoint);
  195. ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
  196. ao2_unlock(endpoint);
  197. endpoint_publish_snapshot(endpoint);
  198. }
  199. static void endpoint_default(void *data,
  200. struct stasis_subscription *sub,
  201. struct stasis_message *message)
  202. {
  203. struct stasis_endpoint *endpoint = data;
  204. if (stasis_subscription_final_message(sub, message)) {
  205. ao2_cleanup(endpoint);
  206. }
  207. }
  208. struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
  209. {
  210. RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
  211. int r = 0;
  212. if (ast_strlen_zero(tech)) {
  213. ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
  214. return NULL;
  215. }
  216. if (ast_strlen_zero(resource)) {
  217. ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
  218. return NULL;
  219. }
  220. endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
  221. if (!endpoint) {
  222. return NULL;
  223. }
  224. endpoint->max_channels = -1;
  225. endpoint->state = AST_ENDPOINT_UNKNOWN;
  226. if (ast_string_field_init(endpoint, 80) != 0) {
  227. return NULL;
  228. }
  229. ast_string_field_set(endpoint, tech, tech);
  230. ast_string_field_set(endpoint, resource, resource);
  231. ast_string_field_build(endpoint, id, "%s/%s", tech, resource);
  232. /* All access to channel_ids should be covered by the endpoint's
  233. * lock; no extra lock needed. */
  234. endpoint->channel_ids = ast_str_container_alloc_options(
  235. AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
  236. if (!endpoint->channel_ids) {
  237. return NULL;
  238. }
  239. endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
  240. endpoint->id);
  241. if (!endpoint->topics) {
  242. return NULL;
  243. }
  244. endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
  245. if (!endpoint->router) {
  246. return NULL;
  247. }
  248. r |= stasis_message_router_add(endpoint->router,
  249. stasis_cache_clear_type(), endpoint_cache_clear,
  250. endpoint);
  251. r |= stasis_message_router_set_default(endpoint->router,
  252. endpoint_default, endpoint);
  253. endpoint_publish_snapshot(endpoint);
  254. ao2_link(endpoints, endpoint);
  255. ao2_ref(endpoint, +1);
  256. return endpoint;
  257. }
  258. static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
  259. {
  260. RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
  261. snapshot = ast_endpoint_snapshot_create(endpoint);
  262. if (!snapshot) {
  263. return NULL;
  264. }
  265. return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
  266. }
  267. void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
  268. {
  269. RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
  270. if (endpoint == NULL) {
  271. return;
  272. }
  273. ao2_unlink(endpoints, endpoint);
  274. clear_msg = create_endpoint_snapshot_message(endpoint);
  275. if (clear_msg) {
  276. RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
  277. message = stasis_cache_clear_create(clear_msg);
  278. if (message) {
  279. stasis_publish(ast_endpoint_topic(endpoint), message);
  280. }
  281. }
  282. /* Bump refcount to hold on to the router */
  283. ao2_ref(endpoint->router, +1);
  284. stasis_message_router_unsubscribe(endpoint->router);
  285. }
  286. const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
  287. {
  288. if (!endpoint) {
  289. return NULL;
  290. }
  291. return endpoint->tech;
  292. }
  293. const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
  294. {
  295. if (!endpoint) {
  296. return NULL;
  297. }
  298. return endpoint->resource;
  299. }
  300. const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
  301. {
  302. if (!endpoint) {
  303. return NULL;
  304. }
  305. return endpoint->id;
  306. }
  307. void ast_endpoint_set_state(struct ast_endpoint *endpoint,
  308. enum ast_endpoint_state state)
  309. {
  310. ast_assert(endpoint != NULL);
  311. ao2_lock(endpoint);
  312. endpoint->state = state;
  313. ao2_unlock(endpoint);
  314. endpoint_publish_snapshot(endpoint);
  315. }
  316. void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
  317. int max_channels)
  318. {
  319. ast_assert(endpoint != NULL);
  320. ao2_lock(endpoint);
  321. endpoint->max_channels = max_channels;
  322. ao2_unlock(endpoint);
  323. endpoint_publish_snapshot(endpoint);
  324. }
  325. static void endpoint_snapshot_dtor(void *obj)
  326. {
  327. struct ast_endpoint_snapshot *snapshot = obj;
  328. int channel;
  329. ast_assert(snapshot != NULL);
  330. for (channel = 0; channel < snapshot->num_channels; channel++) {
  331. ao2_ref(snapshot->channel_ids[channel], -1);
  332. }
  333. ast_string_field_free_memory(snapshot);
  334. }
  335. struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
  336. struct ast_endpoint *endpoint)
  337. {
  338. RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
  339. int channel_count;
  340. struct ao2_iterator i;
  341. void *obj;
  342. SCOPED_AO2LOCK(lock, endpoint);
  343. channel_count = ao2_container_count(endpoint->channel_ids);
  344. snapshot = ao2_alloc(
  345. sizeof(*snapshot) + channel_count * sizeof(char *),
  346. endpoint_snapshot_dtor);
  347. if (ast_string_field_init(snapshot, 80) != 0) {
  348. return NULL;
  349. }
  350. ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
  351. endpoint->resource);
  352. ast_string_field_set(snapshot, tech, endpoint->tech);
  353. ast_string_field_set(snapshot, resource, endpoint->resource);
  354. snapshot->state = endpoint->state;
  355. snapshot->max_channels = endpoint->max_channels;
  356. i = ao2_iterator_init(endpoint->channel_ids, 0);
  357. while ((obj = ao2_iterator_next(&i))) {
  358. /* The reference is kept so the channel id does not go away until the snapshot is gone */
  359. snapshot->channel_ids[snapshot->num_channels++] = obj;
  360. }
  361. ao2_iterator_destroy(&i);
  362. ao2_ref(snapshot, +1);
  363. return snapshot;
  364. }
  365. static void endpoint_cleanup(void)
  366. {
  367. ao2_cleanup(endpoints);
  368. endpoints = NULL;
  369. }
  370. int ast_endpoint_init(void)
  371. {
  372. ast_register_cleanup(endpoint_cleanup);
  373. endpoints = ao2_container_alloc(ENDPOINT_BUCKETS, endpoint_hash,
  374. endpoint_cmp);
  375. if (!endpoints) {
  376. return -1;
  377. }
  378. return 0;
  379. }