res_pjsip_transport_websocket.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * Jason Parker <jparker@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \brief WebSocket transport module
  20. */
  21. /*** MODULEINFO
  22. <depend>pjproject</depend>
  23. <depend>res_pjsip</depend>
  24. <depend>res_http_websocket</depend>
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. #include <pjsip.h>
  29. #include <pjsip_ua.h>
  30. #include "asterisk/module.h"
  31. #include "asterisk/http_websocket.h"
  32. #include "asterisk/res_pjsip.h"
  33. #include "asterisk/res_pjsip_session.h"
  34. #include "asterisk/taskprocessor.h"
  35. static int transport_type_ws;
  36. static int transport_type_wss;
  37. /*!
  38. * \brief Wrapper for pjsip_transport, for storing the WebSocket session
  39. */
  40. struct ws_transport {
  41. pjsip_transport transport;
  42. pjsip_rx_data rdata;
  43. struct ast_websocket *ws_session;
  44. };
  45. /*!
  46. * \brief Send a message over the WebSocket connection.
  47. *
  48. * Called by pjsip transport manager.
  49. */
  50. static pj_status_t ws_send_msg(pjsip_transport *transport,
  51. pjsip_tx_data *tdata,
  52. const pj_sockaddr_t *rem_addr,
  53. int addr_len,
  54. void *token,
  55. pjsip_transport_callback callback)
  56. {
  57. struct ws_transport *wstransport = (struct ws_transport *)transport;
  58. if (ast_websocket_write(wstransport->ws_session, AST_WEBSOCKET_OPCODE_TEXT, tdata->buf.start, (int)(tdata->buf.cur - tdata->buf.start))) {
  59. return PJ_EUNKNOWN;
  60. }
  61. return PJ_SUCCESS;
  62. }
  63. /*!
  64. * \brief Destroy the pjsip transport.
  65. *
  66. * Called by pjsip transport manager.
  67. */
  68. static pj_status_t ws_destroy(pjsip_transport *transport)
  69. {
  70. struct ws_transport *wstransport = (struct ws_transport *)transport;
  71. if (wstransport->transport.ref_cnt) {
  72. pj_atomic_destroy(wstransport->transport.ref_cnt);
  73. }
  74. if (wstransport->transport.lock) {
  75. pj_lock_destroy(wstransport->transport.lock);
  76. }
  77. pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool);
  78. if (wstransport->rdata.tp_info.pool) {
  79. pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->rdata.tp_info.pool);
  80. }
  81. return PJ_SUCCESS;
  82. }
  83. static int transport_shutdown(void *data)
  84. {
  85. pjsip_transport *transport = data;
  86. pjsip_transport_shutdown(transport);
  87. return 0;
  88. }
  89. struct transport_create_data {
  90. struct ws_transport *transport;
  91. struct ast_websocket *ws_session;
  92. };
  93. /*!
  94. * \brief Create a pjsip transport.
  95. */
  96. static int transport_create(void *data)
  97. {
  98. struct transport_create_data *create_data = data;
  99. struct ws_transport *newtransport;
  100. pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
  101. struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
  102. pj_pool_t *pool;
  103. pj_str_t buf;
  104. if (!(pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512))) {
  105. ast_log(LOG_ERROR, "Failed to allocate WebSocket endpoint pool.\n");
  106. return -1;
  107. }
  108. if (!(newtransport = PJ_POOL_ZALLOC_T(pool, struct ws_transport))) {
  109. ast_log(LOG_ERROR, "Failed to allocate WebSocket transport.\n");
  110. pjsip_endpt_release_pool(endpt, pool);
  111. return -1;
  112. }
  113. newtransport->ws_session = create_data->ws_session;
  114. pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt);
  115. pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock);
  116. newtransport->transport.pool = pool;
  117. pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(newtransport->ws_session))), &newtransport->transport.key.rem_addr);
  118. newtransport->transport.key.rem_addr.addr.sa_family = pj_AF_INET();
  119. newtransport->transport.key.type = ast_websocket_is_secure(newtransport->ws_session) ? transport_type_wss : transport_type_ws;
  120. newtransport->transport.addr_len = pj_sockaddr_get_len(&newtransport->transport.key.rem_addr);
  121. pj_sockaddr_cp(&newtransport->transport.local_addr, &newtransport->transport.key.rem_addr);
  122. newtransport->transport.local_name.host.ptr = (char *)pj_pool_alloc(pool, newtransport->transport.addr_len+4);
  123. pj_sockaddr_print(&newtransport->transport.key.rem_addr, newtransport->transport.local_name.host.ptr, newtransport->transport.addr_len+4, 0);
  124. newtransport->transport.local_name.host.slen = pj_ansi_strlen(newtransport->transport.local_name.host.ptr);
  125. newtransport->transport.local_name.port = pj_sockaddr_get_port(&newtransport->transport.key.rem_addr);
  126. newtransport->transport.type_name = (char *)pjsip_transport_get_type_name(newtransport->transport.key.type);
  127. newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type);
  128. newtransport->transport.info = (char *)pj_pool_alloc(newtransport->transport.pool, 64);
  129. newtransport->transport.endpt = endpt;
  130. newtransport->transport.tpmgr = tpmgr;
  131. newtransport->transport.send_msg = &ws_send_msg;
  132. newtransport->transport.destroy = &ws_destroy;
  133. pjsip_transport_register(newtransport->transport.tpmgr, (pjsip_transport *)newtransport);
  134. newtransport->rdata.tp_info.transport = &newtransport->transport;
  135. newtransport->rdata.tp_info.pool = pjsip_endpt_create_pool(endpt, "rtd%p",
  136. PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC);
  137. if (!newtransport->rdata.tp_info.pool) {
  138. ast_log(LOG_ERROR, "Failed to allocate WebSocket rdata.\n");
  139. pjsip_endpt_release_pool(endpt, pool);
  140. return -1;
  141. }
  142. create_data->transport = newtransport;
  143. return 0;
  144. }
  145. struct transport_read_data {
  146. struct ws_transport *transport;
  147. char *payload;
  148. uint64_t payload_len;
  149. };
  150. /*!
  151. * \brief Pass WebSocket data into pjsip transport manager.
  152. */
  153. static int transport_read(void *data)
  154. {
  155. struct transport_read_data *read_data = data;
  156. struct ws_transport *newtransport = read_data->transport;
  157. struct ast_websocket *session = newtransport->ws_session;
  158. pjsip_rx_data *rdata = &newtransport->rdata;
  159. int recvd;
  160. pj_str_t buf;
  161. pj_gettimeofday(&rdata->pkt_info.timestamp);
  162. pj_memcpy(rdata->pkt_info.packet, read_data->payload,
  163. PJSIP_MAX_PKT_LEN < read_data->payload_len ? PJSIP_MAX_PKT_LEN : read_data->payload_len);
  164. rdata->pkt_info.len = read_data->payload_len;
  165. rdata->pkt_info.zero = 0;
  166. pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
  167. rdata->pkt_info.src_addr.addr.sa_family = pj_AF_INET();
  168. rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
  169. pj_ansi_strcpy(rdata->pkt_info.src_name, ast_sockaddr_stringify_host(ast_websocket_remote_address(session)));
  170. rdata->pkt_info.src_port = ast_sockaddr_port(ast_websocket_remote_address(session));
  171. recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
  172. pj_pool_reset(rdata->tp_info.pool);
  173. return (read_data->payload_len == recvd) ? 0 : -1;
  174. }
  175. static int get_write_timeout(void)
  176. {
  177. int write_timeout = -1;
  178. struct ao2_container *transports;
  179. transports = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "transport", AST_RETRIEVE_FLAG_ALL, NULL);
  180. if (transports) {
  181. struct ao2_iterator it_transports = ao2_iterator_init(transports, 0);
  182. struct ast_sip_transport *transport;
  183. for (; (transport = ao2_iterator_next(&it_transports)); ao2_cleanup(transport)) {
  184. if (transport->type != AST_TRANSPORT_WS && transport->type != AST_TRANSPORT_WSS) {
  185. continue;
  186. }
  187. ast_debug(5, "Found %s transport with write timeout: %d\n",
  188. transport->type == AST_TRANSPORT_WS ? "WS" : "WSS",
  189. transport->write_timeout);
  190. write_timeout = MAX(write_timeout, transport->write_timeout);
  191. }
  192. ao2_cleanup(transports);
  193. }
  194. if (write_timeout < 0) {
  195. write_timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
  196. }
  197. ast_debug(1, "Write timeout for WS/WSS transports: %d\n", write_timeout);
  198. return write_timeout;
  199. }
  200. /*!
  201. \brief WebSocket connection handler.
  202. */
  203. static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
  204. {
  205. struct ast_taskprocessor *serializer = NULL;
  206. struct transport_create_data create_data;
  207. struct ws_transport *transport = NULL;
  208. struct transport_read_data read_data;
  209. if (ast_websocket_set_nonblock(session)) {
  210. ast_websocket_unref(session);
  211. return;
  212. }
  213. if (ast_websocket_set_timeout(session, get_write_timeout())) {
  214. ast_websocket_unref(session);
  215. return;
  216. }
  217. if (!(serializer = ast_sip_create_serializer())) {
  218. ast_websocket_unref(session);
  219. return;
  220. }
  221. create_data.ws_session = session;
  222. if (ast_sip_push_task_synchronous(serializer, transport_create, &create_data)) {
  223. ast_log(LOG_ERROR, "Could not create WebSocket transport.\n");
  224. ast_websocket_unref(session);
  225. return;
  226. }
  227. transport = create_data.transport;
  228. read_data.transport = transport;
  229. while (ast_wait_for_input(ast_websocket_fd(session), -1) > 0) {
  230. enum ast_websocket_opcode opcode;
  231. int fragmented;
  232. if (ast_websocket_read(session, &read_data.payload, &read_data.payload_len, &opcode, &fragmented)) {
  233. break;
  234. }
  235. if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) {
  236. ast_sip_push_task_synchronous(serializer, transport_read, &read_data);
  237. } else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
  238. break;
  239. }
  240. }
  241. ast_sip_push_task_synchronous(serializer, transport_shutdown, transport);
  242. ast_taskprocessor_unreference(serializer);
  243. ast_websocket_unref(session);
  244. }
  245. /*!
  246. * \brief Store the transport a message came in on, so it can be used for outbound messages to that contact.
  247. */
  248. static pj_bool_t websocket_on_rx_msg(pjsip_rx_data *rdata)
  249. {
  250. static const pj_str_t STR_WS = { "ws", 2 };
  251. static const pj_str_t STR_WSS = { "wss", 3 };
  252. pjsip_contact_hdr *contact;
  253. long type = rdata->tp_info.transport->key.type;
  254. if (type != (long)transport_type_ws && type != (long)transport_type_wss) {
  255. return PJ_FALSE;
  256. }
  257. if ((contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL)) && !contact->star &&
  258. (PJSIP_URI_SCHEME_IS_SIP(contact->uri) || PJSIP_URI_SCHEME_IS_SIPS(contact->uri))) {
  259. pjsip_sip_uri *uri = pjsip_uri_get_uri(contact->uri);
  260. pj_cstr(&uri->host, rdata->pkt_info.src_name);
  261. uri->port = rdata->pkt_info.src_port;
  262. pj_strdup(rdata->tp_info.pool, &uri->transport_param, (type == (long)transport_type_ws) ? &STR_WS : &STR_WSS);
  263. }
  264. rdata->msg_info.via->rport_param = 0;
  265. return PJ_FALSE;
  266. }
  267. static pjsip_module websocket_module = {
  268. .name = { "WebSocket Transport Module", 26 },
  269. .id = -1,
  270. .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER,
  271. .on_rx_request = websocket_on_rx_msg,
  272. .on_rx_response = websocket_on_rx_msg,
  273. };
  274. /*! \brief Function called when an INVITE goes out */
  275. static void websocket_outgoing_invite_request(struct ast_sip_session *session, struct pjsip_tx_data *tdata)
  276. {
  277. if (session->inv_session->state == PJSIP_INV_STATE_NULL) {
  278. pjsip_dlg_add_usage(session->inv_session->dlg, &websocket_module, NULL);
  279. }
  280. }
  281. /*! \brief Supplement for adding Websocket functionality to dialog */
  282. static struct ast_sip_session_supplement websocket_supplement = {
  283. .method = "INVITE",
  284. .priority = AST_SIP_SUPPLEMENT_PRIORITY_FIRST + 1,
  285. .outgoing_request = websocket_outgoing_invite_request,
  286. };
  287. static int load_module(void)
  288. {
  289. CHECK_PJSIP_MODULE_LOADED();
  290. pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE, "WS", 5060, &transport_type_ws);
  291. pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE, "WSS", 5060, &transport_type_wss);
  292. if (ast_sip_register_service(&websocket_module) != PJ_SUCCESS) {
  293. return AST_MODULE_LOAD_DECLINE;
  294. }
  295. if (ast_sip_session_register_supplement(&websocket_supplement)) {
  296. ast_sip_unregister_service(&websocket_module);
  297. return AST_MODULE_LOAD_DECLINE;
  298. }
  299. if (ast_websocket_add_protocol("sip", websocket_cb)) {
  300. ast_sip_session_unregister_supplement(&websocket_supplement);
  301. ast_sip_unregister_service(&websocket_module);
  302. return AST_MODULE_LOAD_DECLINE;
  303. }
  304. return AST_MODULE_LOAD_SUCCESS;
  305. }
  306. static int unload_module(void)
  307. {
  308. ast_sip_unregister_service(&websocket_module);
  309. ast_sip_session_unregister_supplement(&websocket_supplement);
  310. ast_websocket_remove_protocol("sip", websocket_cb);
  311. return 0;
  312. }
  313. AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP WebSocket Transport Support",
  314. .support_level = AST_MODULE_SUPPORT_CORE,
  315. .load = load_module,
  316. .unload = unload_module,
  317. .load_pri = AST_MODPRI_APP_DEPEND,
  318. );