stasis_endpoints.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Stasis endpoint API.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/stasis.h"
  31. #include "asterisk/stasis_endpoints.h"
  32. /*** DOCUMENTATION
  33. <managerEvent language="en_US" name="PeerStatus">
  34. <managerEventInstance class="EVENT_FLAG_SYSTEM">
  35. <synopsis>Raised when the state of a peer changes.</synopsis>
  36. <syntax>
  37. <parameter name="ChannelType">
  38. <para>The channel technology of the peer.</para>
  39. </parameter>
  40. <parameter name="Peer">
  41. <para>The name of the peer (including channel technology).</para>
  42. </parameter>
  43. <parameter name="PeerStatus">
  44. <para>New status of the peer.</para>
  45. <enumlist>
  46. <enum name="Unknown"/>
  47. <enum name="Registered"/>
  48. <enum name="Unregistered"/>
  49. <enum name="Rejected"/>
  50. <enum name="Lagged"/>
  51. </enumlist>
  52. </parameter>
  53. <parameter name="Cause">
  54. <para>The reason the status has changed.</para>
  55. </parameter>
  56. <parameter name="Address">
  57. <para>New address of the peer.</para>
  58. </parameter>
  59. <parameter name="Port">
  60. <para>New port for the peer.</para>
  61. </parameter>
  62. <parameter name="Time">
  63. <para>Time it takes to reach the peer and receive a response.</para>
  64. </parameter>
  65. </syntax>
  66. </managerEventInstance>
  67. </managerEvent>
  68. ***/
  69. static struct stasis_cp_all *endpoint_cache_all;
  70. struct stasis_cp_all *ast_endpoint_cache_all(void)
  71. {
  72. return endpoint_cache_all;
  73. }
  74. struct stasis_cache *ast_endpoint_cache(void)
  75. {
  76. return stasis_cp_all_cache(endpoint_cache_all);
  77. }
  78. struct stasis_topic *ast_endpoint_topic_all(void)
  79. {
  80. return stasis_cp_all_topic(endpoint_cache_all);
  81. }
  82. struct stasis_topic *ast_endpoint_topic_all_cached(void)
  83. {
  84. return stasis_cp_all_topic_cached(endpoint_cache_all);
  85. }
  86. static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg);
  87. STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type);
  88. STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type,
  89. .to_ami = peerstatus_to_ami,
  90. );
  91. static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg)
  92. {
  93. struct ast_endpoint_blob *obj = stasis_message_data(msg);
  94. RAII_VAR(struct ast_str *, peerstatus_event_string, ast_str_create(64), ast_free);
  95. const char *value;
  96. /* peer_status is the only *required* thing */
  97. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "peer_status")))) {
  98. return NULL;
  99. }
  100. ast_str_append(&peerstatus_event_string, 0, "PeerStatus: %s\r\n", value);
  101. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "cause")))) {
  102. ast_str_append(&peerstatus_event_string, 0, "Cause: %s\r\n", value);
  103. }
  104. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "address")))) {
  105. ast_str_append(&peerstatus_event_string, 0, "Address: %s\r\n", value);
  106. }
  107. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "port")))) {
  108. ast_str_append(&peerstatus_event_string, 0, "Port: %s\r\n", value);
  109. }
  110. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "time")))) {
  111. ast_str_append(&peerstatus_event_string, 0, "Time: %s\r\n", value);
  112. }
  113. return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "PeerStatus",
  114. "ChannelType: %s\r\n"
  115. "Peer: %s/%s\r\n"
  116. "%s",
  117. obj->snapshot->tech,
  118. obj->snapshot->tech,
  119. obj->snapshot->resource,
  120. ast_str_buffer(peerstatus_event_string));
  121. }
  122. static void endpoint_blob_dtor(void *obj)
  123. {
  124. struct ast_endpoint_blob *event = obj;
  125. ao2_cleanup(event->snapshot);
  126. ast_json_unref(event->blob);
  127. }
  128. struct stasis_message *ast_endpoint_blob_create(struct ast_endpoint *endpoint,
  129. struct stasis_message_type *type, struct ast_json *blob)
  130. {
  131. RAII_VAR(struct ast_endpoint_blob *, obj, NULL, ao2_cleanup);
  132. RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
  133. if (!type) {
  134. return NULL;
  135. }
  136. if (!blob) {
  137. blob = ast_json_null();
  138. }
  139. if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) {
  140. return NULL;
  141. }
  142. if (endpoint) {
  143. if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) {
  144. return NULL;
  145. }
  146. }
  147. obj->blob = ast_json_ref(blob);
  148. if (!(msg = stasis_message_create(type, obj))) {
  149. return NULL;
  150. }
  151. ao2_ref(msg, +1);
  152. return msg;
  153. }
  154. void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type,
  155. struct ast_json *blob)
  156. {
  157. RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
  158. if (blob) {
  159. message = ast_endpoint_blob_create(endpoint, type, blob);
  160. }
  161. if (message) {
  162. stasis_publish(ast_endpoint_topic(endpoint), message);
  163. }
  164. }
  165. struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
  166. const char *name)
  167. {
  168. RAII_VAR(char *, id, NULL, ast_free);
  169. RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
  170. struct ast_endpoint_snapshot *snapshot;
  171. if (ast_strlen_zero(name)) {
  172. ast_asprintf(&id, "%s", tech);
  173. } else {
  174. ast_asprintf(&id, "%s/%s", tech, name);
  175. }
  176. if (!id) {
  177. return NULL;
  178. }
  179. ast_tech_to_upper(id);
  180. msg = stasis_cache_get(ast_endpoint_cache(),
  181. ast_endpoint_snapshot_type(), id);
  182. if (!msg) {
  183. return NULL;
  184. }
  185. snapshot = stasis_message_data(msg);
  186. ast_assert(snapshot != NULL);
  187. ao2_ref(snapshot, +1);
  188. return snapshot;
  189. }
  190. /*!
  191. * \brief Callback extract a unique identity from a snapshot message.
  192. *
  193. * This identity is unique to the underlying object of the snapshot, such as the
  194. * UniqueId field of a channel.
  195. *
  196. * \param message Message to extract id from.
  197. * \return String representing the snapshot's id.
  198. * \return \c NULL if the message_type of the message isn't a handled snapshot.
  199. * \since 12
  200. */
  201. static const char *endpoint_snapshot_get_id(struct stasis_message *message)
  202. {
  203. struct ast_endpoint_snapshot *snapshot;
  204. if (ast_endpoint_snapshot_type() != stasis_message_type(message)) {
  205. return NULL;
  206. }
  207. snapshot = stasis_message_data(message);
  208. return snapshot->id;
  209. }
  210. struct ast_json *ast_endpoint_snapshot_to_json(
  211. const struct ast_endpoint_snapshot *snapshot,
  212. const struct stasis_message_sanitizer *sanitize)
  213. {
  214. RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
  215. struct ast_json *channel_array;
  216. int i;
  217. json = ast_json_pack("{s: s, s: s, s: s, s: []}",
  218. "technology", snapshot->tech,
  219. "resource", snapshot->resource,
  220. "state", ast_endpoint_state_to_string(snapshot->state),
  221. "channel_ids");
  222. if (json == NULL) {
  223. return NULL;
  224. }
  225. if (snapshot->max_channels != -1) {
  226. int res = ast_json_object_set(json, "max_channels",
  227. ast_json_integer_create(snapshot->max_channels));
  228. if (res != 0) {
  229. return NULL;
  230. }
  231. }
  232. channel_array = ast_json_object_get(json, "channel_ids");
  233. ast_assert(channel_array != NULL);
  234. for (i = 0; i < snapshot->num_channels; ++i) {
  235. int res;
  236. if (sanitize && sanitize->channel_id
  237. && sanitize->channel_id(snapshot->channel_ids[i])) {
  238. continue;
  239. }
  240. res = ast_json_array_append(channel_array,
  241. ast_json_string_create(snapshot->channel_ids[i]));
  242. if (res != 0) {
  243. return NULL;
  244. }
  245. }
  246. return ast_json_ref(json);
  247. }
  248. static void endpoints_stasis_cleanup(void)
  249. {
  250. STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_snapshot_type);
  251. STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_state_type);
  252. ao2_cleanup(endpoint_cache_all);
  253. endpoint_cache_all = NULL;
  254. }
  255. int ast_endpoint_stasis_init(void)
  256. {
  257. int res = 0;
  258. ast_register_cleanup(endpoints_stasis_cleanup);
  259. endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all",
  260. endpoint_snapshot_get_id);
  261. if (!endpoint_cache_all) {
  262. return -1;
  263. }
  264. res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type);
  265. res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type);
  266. return res;
  267. }