resource_events.c 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2012 - 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief /api-docs/events.{format} implementation- WebSocket resource
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. #include "asterisk.h"
  25. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  26. #include "asterisk/astobj2.h"
  27. #include "asterisk/stasis_app.h"
  28. #include "resource_events.h"
  29. /*! Number of buckets for the Stasis application hash table. Remember to keep it
  30. * a prime number!
  31. */
  32. #define APPS_NUM_BUCKETS 7
  33. /*! \brief A connection to the event WebSocket */
  34. struct event_session {
  35. struct ast_ari_websocket_session *ws_session;
  36. struct ao2_container *websocket_apps;
  37. };
  38. /*!
  39. * \brief Explicitly shutdown a session.
  40. *
  41. * An explicit shutdown is necessary, since stasis-app has a reference to this
  42. * session. We also need to be sure to null out the \c ws_session field, since
  43. * the websocket is about to go away.
  44. *
  45. * \param session Session info struct.
  46. */
  47. static void session_shutdown(struct event_session *session)
  48. {
  49. struct ao2_iterator i;
  50. char *app;
  51. SCOPED_AO2LOCK(lock, session);
  52. i = ao2_iterator_init(session->websocket_apps, 0);
  53. while ((app = ao2_iterator_next(&i))) {
  54. stasis_app_unregister(app);
  55. ao2_cleanup(app);
  56. }
  57. ao2_iterator_destroy(&i);
  58. ao2_cleanup(session->websocket_apps);
  59. session->websocket_apps = NULL;
  60. session->ws_session = NULL;
  61. }
  62. static void session_dtor(void *obj)
  63. {
  64. #ifdef AST_DEVMODE /* Avoid unused variable warning */
  65. struct event_session *session = obj;
  66. #endif
  67. /* session_shutdown should have been called before */
  68. ast_assert(session->ws_session == NULL);
  69. ast_assert(session->websocket_apps == NULL);
  70. }
  71. static void session_cleanup(struct event_session *session)
  72. {
  73. session_shutdown(session);
  74. ao2_cleanup(session);
  75. }
  76. static struct event_session *session_create(
  77. struct ast_ari_websocket_session *ws_session)
  78. {
  79. RAII_VAR(struct event_session *, session, NULL, ao2_cleanup);
  80. session = ao2_alloc(sizeof(*session), session_dtor);
  81. session->ws_session = ws_session;
  82. session->websocket_apps =
  83. ast_str_container_alloc(APPS_NUM_BUCKETS);
  84. if (!session->websocket_apps) {
  85. return NULL;
  86. }
  87. ao2_ref(session, +1);
  88. return session;
  89. }
  90. /*!
  91. * \brief Callback handler for Stasis application messages.
  92. */
  93. static void app_handler(void *data, const char *app_name,
  94. struct ast_json *message)
  95. {
  96. struct event_session *session = data;
  97. int res;
  98. const char *msg_type = S_OR(
  99. ast_json_string_get(ast_json_object_get(message, "type")),
  100. "");
  101. const char *msg_application = S_OR(
  102. ast_json_string_get(ast_json_object_get(message, "application")),
  103. "");
  104. /* Determine if we've been replaced */
  105. if (strcmp(msg_type, "ApplicationReplaced") == 0 &&
  106. strcmp(msg_application, app_name) == 0) {
  107. ao2_find(session->websocket_apps, msg_application,
  108. OBJ_UNLINK | OBJ_NODATA);
  109. }
  110. res = ast_json_object_set(message, "application",
  111. ast_json_string_create(app_name));
  112. if(res != 0) {
  113. return;
  114. }
  115. ao2_lock(session);
  116. if (session->ws_session) {
  117. ast_ari_websocket_session_write(session->ws_session, message);
  118. }
  119. ao2_unlock(session);
  120. }
  121. /*!
  122. * \brief Register for all of the apps given.
  123. * \param session Session info struct.
  124. * \param app_name Name of application to register.
  125. */
  126. static int session_register_app(struct event_session *session,
  127. const char *app_name)
  128. {
  129. SCOPED_AO2LOCK(lock, session);
  130. ast_assert(session->ws_session != NULL);
  131. ast_assert(session->websocket_apps != NULL);
  132. if (ast_strlen_zero(app_name)) {
  133. return -1;
  134. }
  135. if (ast_str_container_add(session->websocket_apps, app_name)) {
  136. ast_ari_websocket_session_write(session->ws_session,
  137. ast_ari_oom_json());
  138. return -1;
  139. }
  140. stasis_app_register(app_name, app_handler, session);
  141. return 0;
  142. }
  143. void ast_ari_websocket_events_event_websocket(struct ast_ari_websocket_session *ws_session,
  144. struct ast_variable *headers,
  145. struct ast_ari_events_event_websocket_args *args)
  146. {
  147. RAII_VAR(struct event_session *, session, NULL, session_cleanup);
  148. struct ast_json *msg;
  149. int res;
  150. size_t i;
  151. ast_debug(3, "/events WebSocket connection\n");
  152. session = session_create(ws_session);
  153. if (!session) {
  154. ast_ari_websocket_session_write(ws_session, ast_ari_oom_json());
  155. return;
  156. }
  157. res = 0;
  158. for (i = 0; i < args->app_count; ++i) {
  159. if (ast_strlen_zero(args->app[i])) {
  160. continue;
  161. }
  162. res |= session_register_app(session, args->app[i]);
  163. }
  164. if (ao2_container_count(session->websocket_apps) == 0) {
  165. RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
  166. msg = ast_json_pack("{s: s, s: [s]}",
  167. "type", "MissingParams",
  168. "params", "app");
  169. if (!msg) {
  170. msg = ast_json_ref(ast_ari_oom_json());
  171. }
  172. ast_ari_websocket_session_write(session->ws_session, msg);
  173. return;
  174. }
  175. if (res != 0) {
  176. ast_ari_websocket_session_write(ws_session, ast_ari_oom_json());
  177. return;
  178. }
  179. /* We don't process any input, but we'll consume it waiting for EOF */
  180. while ((msg = ast_ari_websocket_session_read(ws_session))) {
  181. ast_json_unref(msg);
  182. }
  183. }
  184. void ast_ari_events_user_event(struct ast_variable *headers,
  185. struct ast_ari_events_user_event_args *args,
  186. struct ast_ari_response *response)
  187. {
  188. enum stasis_app_user_event_res res;
  189. struct ast_json *json_variables = NULL;
  190. if (args->variables) {
  191. ast_ari_events_user_event_parse_body(args->variables, args);
  192. json_variables = ast_json_object_get(args->variables, "variables");
  193. }
  194. if (ast_strlen_zero(args->application)) {
  195. ast_ari_response_error(response, 400, "Bad Request",
  196. "Missing parameter application");
  197. return;
  198. }
  199. res = stasis_app_user_event(args->application,
  200. args->event_name,
  201. args->source, args->source_count,
  202. json_variables);
  203. switch (res) {
  204. case STASIS_APP_USER_OK:
  205. ast_ari_response_no_content(response);
  206. break;
  207. case STASIS_APP_USER_APP_NOT_FOUND:
  208. ast_ari_response_error(response, 404, "Not Found",
  209. "Application not found");
  210. break;
  211. case STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND:
  212. ast_ari_response_error(response, 422, "Unprocessable Entity",
  213. "Event source was not found");
  214. break;
  215. case STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME:
  216. ast_ari_response_error(response, 400, "Bad Request",
  217. "Invalid event source URI scheme");
  218. break;
  219. case STASIS_APP_USER_USEREVENT_INVALID:
  220. ast_ari_response_error(response, 400, "Bad Request",
  221. "Invalid userevnet data");
  222. break;
  223. case STASIS_APP_USER_INTERNAL_ERROR:
  224. default:
  225. ast_ari_response_error(response, 500, "Internal Server Error",
  226. "Error processing request");
  227. }
  228. }